Copilot commented on code in PR #17210: URL: https://github.com/apache/iotdb/pull/17210#discussion_r2850473160
########## iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TopKRankingOperatorTest.java: ########## @@ -0,0 +1,696 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process.window; + +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; +import org.apache.iotdb.db.queryengine.common.PlanFragmentId; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.execution.driver.DriverContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.TreeLinearFillOperator; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKRankingNode; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.read.common.block.TsBlockBuilder; +import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.tsfile.utils.Binary; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; +import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TopKRankingOperatorTest { + private static final ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool( + 1, "topKRankingOperator-test-instance-notification"); + + @Test + public void testTopKWithPartition() { + // Input: 4 rows for d1, 3 rows for d2 + // Sort by value (column 2) ascending, top 2 per partition + long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 4, 6, 2, 1}}; + + // Expected: top 2 per partition sorted by value ASC + // d1: value=1(rn=1), value=3(rn=2) + // d2: value=1(rn=1), value=2(rn=2) + Map<String, List<int[]>> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {1, 1}, new int[] {2, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + expectedByDevice, + 4); + } + + @Test + public void testTopKWithPartitionDescending() { + long[][] timeArray = {{1, 2, 3, 4, 5, 6}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 6, 2, 4}}; + + // top 2 per partition sorted by value DESC + // d1: value=5(rn=1), value=3(rn=2) + // d2: value=6(rn=1), value=4(rn=2) + Map<String, List<int[]>> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {5, 1}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {6, 1}, new int[] {4, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.DESC_NULLS_LAST), + 2, + false, + expectedByDevice, + 4); + } + + @Test + public void testTopKWithoutPartition() { + // No partition: all rows in one group + long[][] timeArray = {{1, 2, 3, 4, 5}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 1, 4, 2}}; + + // top 3 globally sorted by value ASC: value=1(rn=1), value=2(rn=2), value=3(rn=3) + int[][] expectedValueAndRn = {{1, 1}, {2, 2}, {3, 3}}; + + verifyTopKResultsGlobal( + timeArray, + deviceArray, + valueArray, + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 3, + false, + expectedValueAndRn, + 3); + } + + @Test + public void testTopKWithMultipleTsBlocks() { + long[][] timeArray = {{1, 2, 3}, {4, 5}, {6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1"}, {"d2", "d2"}, {"d2", "d2"}}; + int[][] valueArray = {{5, 3, 1}, {6, 2}, {4, 1}}; + + // top 2 per partition sorted by value ASC + // d1: value=1(rn=1), value=3(rn=2) + // d2: value=1(rn=1), value=2(rn=2) + Map<String, List<int[]>> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {1, 1}, new int[] {2, 2})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + expectedByDevice, + 4); + } + + @Test + public void testTopKWithTopOne() { + long[][] timeArray = {{1, 2, 3, 4}}; + String[][] deviceArray = {{"d1", "d1", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 6, 2}}; + + // top 1 per partition sorted by value ASC + // d1: value=3(rn=1) + // d2: value=2(rn=1) + Map<String, List<int[]>> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Collections.singletonList(new int[] {3, 1})); + expectedByDevice.put("d2", Collections.singletonList(new int[] {2, 1})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 1, + false, + expectedByDevice, + 2); + } + + // ==================== RANK Tests ==================== + + @Test + public void testRankWithPartitionAndTies() { + // d1: values [5, 3, 3, 1], d2: values [6, 2, 2] + // topN=2 ASC → d1 keeps rank≤2: 1(r=1),3(r=2),3(r=2); d2 keeps rank≤2: 2(r=1),2(r=1) + long[][] timeArray = {{1, 2, 3, 4, 5, 6, 7}}; + String[][] deviceArray = {{"d1", "d1", "d1", "d1", "d2", "d2", "d2"}}; + int[][] valueArray = {{5, 3, 3, 1, 6, 2, 2}}; + + Map<String, List<int[]>> expectedByDevice = new HashMap<>(); + expectedByDevice.put("d1", Arrays.asList(new int[] {1, 1}, new int[] {3, 2}, new int[] {3, 2})); + expectedByDevice.put("d2", Arrays.asList(new int[] {2, 1}, new int[] {2, 1})); + + verifyTopKResultsByPartition( + timeArray, + deviceArray, + valueArray, + Collections.singletonList(1), + Collections.singletonList(TSDataType.TEXT), + Collections.singletonList(2), + Collections.singletonList(SortOrder.ASC_NULLS_LAST), + 2, + false, + TopKRankingNode.RankingType.RANK, + expectedByDevice, + 5); + } + + @Test + public void testRankWithPartitionDescendingAndTies() { + // d1: values [5, 3, 3, 1] DESC → 5(r=1),3(r=2),3(r=2),1(r=4) → keep rank≤2 + // d2: values [6, 2, 4] DESC → 6(r=1),4(r=2),2(r=3) → keep rank≤2 Review Comment: The comment incorrectly describes the data for d2. The comment states "d2: values [6, 2, 4]" but looking at the actual test data arrays, d2 only has two values: 6 (at index 4) and 4 (at index 5). There is no value 2 for d2 in this test case. The comment should be corrected to "d2: values [6, 4] DESC → 6(r=1),4(r=2) → keep rank≤2". ```suggestion // d2: values [6, 4] DESC → 6(r=1),4(r=2) → keep rank≤2 ``` ########## integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunction3IT.java: ########## @@ -166,6 +166,84 @@ public void testReplaceWindowWithRowNumber() { DATABASE_NAME); } + @Test + public void testPushDownFilterIntoWindowWithRank() { + // Data: d1 values {3,5,3,1}, d2 values {2,4} + // rank(PARTITION BY device ORDER BY value): + // d1: 1.0→rank=1, 3.0→rank=2, 3.0→rank=2, 5.0→rank=4 + // d2: 2.0→rank=1, 4.0→rank=2 + // WHERE rk <= 2: keeps d1 rows with rank≤2 (3 rows due to tie), d2 all (2 rows) + String[] expectedHeader = new String[] {"time", "device", "value", "rk"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,2,", + "2021-01-01T09:09:00.000Z,d1,3.0,2,", + "2021-01-01T09:10:00.000Z,d1,1.0,1,", + "2021-01-01T09:08:00.000Z,d2,2.0,1,", + "2021-01-01T09:15:00.000Z,d2,4.0,2,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY value) as rk FROM demo) WHERE rk <= 2 ORDER BY device, time", + expectedHeader, + retArray, + DATABASE_NAME); + } + + @Test + public void testPushDownLimitIntoWindowWithRank() { + // TopKRanking(RANK, topN=2) keeps rank≤2 per partition, then LIMIT 2 on final result + // d1 rank≤2: 1.0(r=1), 3.0(r=2), 3.0(r=2) → 3 rows sorted by time: 09:05,09:09,09:10 + // d2 rank≤2: 2.0(r=1), 4.0(r=2) → 2 rows + // ORDER BY device, time LIMIT 2 → first 2 from d1 + String[] expectedHeader = new String[] {"time", "device", "value", "rk"}; + String[] retArray = + new String[] { + "2021-01-01T09:05:00.000Z,d1,3.0,2,", "2021-01-01T09:07:00.000Z,d1,5.0,4,", + }; + tableResultSetEqualTest( + "SELECT * FROM (SELECT *, rank() OVER (PARTITION BY device ORDER BY value) as rk FROM demo) ORDER BY device, time LIMIT 2", + expectedHeader, + retArray, + DATABASE_NAME); + } Review Comment: The test comment and expected result are inconsistent with the SQL query. The comment states "TopKRanking(RANK, topN=2) keeps rank≤2 per partition", but the SQL query does not have a WHERE clause to filter by rank. Without such a filter, all rows should be returned with their computed rank values, not just those with rank≤2. The expected result showing value=5 with rank=4 is correct for the query without a WHERE clause, but contradicts the comment. Either add a WHERE clause like "WHERE rk <= 2" to match the comment, or update the comment to correctly describe that this test computes ranks for all rows and then applies LIMIT 2 to the final result set after ordering by device and time. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRankAccumulator.java: ########## @@ -0,0 +1,754 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArrayFIFOQueue; +import org.apache.iotdb.db.utils.HeapTraversal; + +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.function.LongConsumer; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class GroupedTopNRankAccumulator { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(GroupedTopNRankAccumulator.class); + private static final long UNKNOWN_INDEX = -1; + private static final long NULL_GROUP_ID = -1; + + private final GroupIdToHeapBuffer groupIdToHeapBuffer = new GroupIdToHeapBuffer(); + private final HeapNodeBuffer heapNodeBuffer = new HeapNodeBuffer(); + private final PeerGroupBuffer peerGroupBuffer = new PeerGroupBuffer(); + private final HeapTraversal heapTraversal = new HeapTraversal(); + + // Map from (Group ID, Row Value) to Heap Node Index where the value is stored + private final TopNPeerGroupLookup peerGroupLookup; + + private final RowIdComparisonHashStrategy strategy; + private final int topN; + private final LongConsumer rowIdEvictionListener; + + public GroupedTopNRankAccumulator( + RowIdComparisonHashStrategy strategy, int topN, LongConsumer rowIdEvictionListener) { + this.strategy = requireNonNull(strategy, "strategy is null"); + this.peerGroupLookup = new TopNPeerGroupLookup(10_000, strategy, NULL_GROUP_ID, UNKNOWN_INDEX); + checkArgument(topN > 0, "topN must be greater than zero"); + this.topN = topN; + this.rowIdEvictionListener = + requireNonNull(rowIdEvictionListener, "rowIdEvictionListener is null"); + } + + public long sizeOf() { + return INSTANCE_SIZE + + groupIdToHeapBuffer.sizeOf() + + heapNodeBuffer.sizeOf() + + peerGroupBuffer.sizeOf() + + heapTraversal.sizeOf() + + peerGroupLookup.sizeOf(); + } + + public int findFirstPositionToAdd( + TsBlock newPage, + int groupCount, + int[] groupIds, + TsBlockWithPositionComparator comparator, + RowReferenceTsBlockManager pageManager) { + int currentGroups = groupIdToHeapBuffer.getTotalGroups(); + groupIdToHeapBuffer.allocateGroupIfNeeded(groupCount); + + for (int position = 0; position < newPage.getPositionCount(); position++) { + int groupId = groupIds[position]; + if (groupId >= currentGroups || groupIdToHeapBuffer.getHeapValueCount(groupId) < topN) { + return position; + } + long heapRootNodeIndex = groupIdToHeapBuffer.getHeapRootNodeIndex(groupId); + if (heapRootNodeIndex == UNKNOWN_INDEX) { + return position; + } + long rightPageRowId = peekRootRowIdByHeapNodeIndex(heapRootNodeIndex); + TsBlock rightPage = pageManager.getTsBlock(rightPageRowId); + int rightPosition = pageManager.getPosition(rightPageRowId); Review Comment: Inconsistent naming convention: parameters and variables use "page" terminology (newPage, pageManager, rightPage) while the codebase typically uses "TsBlock" terminology. For consistency with the rest of the IoTDB codebase, consider renaming: newPage → newTsBlock, pageManager → tsBlockManager, rightPage → rightTsBlock. The GroupedTopNRankBuilder already uses the consistent naming (tsBlockManager, tsBlock), so this file should follow the same pattern. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SortElimination.java: ########## @@ -139,6 +141,26 @@ public PlanNode visitPatternRecognition(PatternRecognitionNode node, Context con context.setCannotEliminateSort(true); return newNode; } + + @Override + public PlanNode visitTopKRanking(TopKRankingNode node, Context context) { + PlanNode newNode = node.clone(); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, context)); + } + context.setCannotEliminateSort(true); + return newNode; + } + + @Override + public PlanNode visitRowNumber(RowNumberNode node, Context context) { + PlanNode newNode = node.clone(); + for (PlanNode child : node.getChildren()) { + newNode.addChild(child.accept(this, context)); + } + context.setCannotEliminateSort(true); + return newNode; + } Review Comment: The implementation correctly prevents sort elimination for TopKRanking and RowNumber nodes, but there's a comment at line 169-173 (not shown in diff but nearby) that lists only 3 situations where sort cannot be eliminated. With these new visitor methods, there are now 5 situations. Consider updating that comment to include TopKRanking and RowNumber nodes, or generalize it to avoid becoming stale when new cases are added. ########## iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/TopNPeerGroupLookup.java: ########## @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator; + +import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.array.LongBigArray; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +/** Optimized hash table for streaming Top N peer group lookup operations. */ +// Note: this code was forked from fastutil (http://fastutil.di.unimi.it/) +// Long2LongOpenCustomHashMap. +// Copyright (C) 2002-2019 Sebastiano Vigna +public class TopNPeerGroupLookup { + private static final long INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TopNPeerGroupLookup.class); + + /** The buffer containing key and value data. */ + private Buffer buffer; + + /** The mask for wrapping a position counter. */ + private long mask; + + /** The hash strategy. */ + private final RowIdHashStrategy strategy; + + /** The current allocated table size. */ + private long tableSize; + + /** Threshold after which we rehash. */ + private long maxFill; + + /** The acceptable load factor. */ + private final float fillFactor; + + /** Number of entries in the set. */ + private long entryCount; + + /** + * The value denoting unmapped group IDs. Since group IDs need to co-exist at all times with row + * IDs, we only need to use one of the two IDs to indicate that a slot is unused. Group IDs have + * been arbitrarily selected for that purpose. + */ + private final long unmappedGroupId; + + /** The default return value for {@code get()}, {@code put()} and {@code remove()}. */ + private final long defaultReturnValue; + + /** + * Standard hash table parameters are expected. {@code unmappedGroupId} specifies the internal + * marker value for unmapped group IDs. + */ + public TopNPeerGroupLookup( + long expected, + float fillFactor, + RowIdHashStrategy strategy, + long unmappedGroupId, + long defaultReturnValue) { + checkArgument(expected >= 0, "The expected number of elements must be nonnegative"); + checkArgument( + fillFactor > 0 && fillFactor <= 1, + "Load factor must be greater than 0 and smaller than or equal to 1"); + this.fillFactor = fillFactor; + this.strategy = requireNonNull(strategy, "strategy is null"); + this.unmappedGroupId = unmappedGroupId; + this.defaultReturnValue = defaultReturnValue; + + tableSize = bigArraySize(expected, fillFactor); + mask = tableSize - 1; + maxFill = maxFill(tableSize, fillFactor); + buffer = new Buffer(tableSize, unmappedGroupId); + } + + public TopNPeerGroupLookup( + long expected, RowIdHashStrategy strategy, long unmappedGroupId, long defaultReturnValue) { + this(expected, 0.75f, strategy, unmappedGroupId, defaultReturnValue); + } + + /** Returns the size of this hash map in bytes. */ + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf(); + } + + public long size() { + return entryCount; + } + + public boolean isEmpty() { + return entryCount == 0; + } + + public long get(long groupId, long rowId) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowId); + long index = hash & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return buffer.getValue(index); + } + // There's always an unused entry. + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return buffer.getValue(index); + } + } + } + + public long get(long groupId, RowReference rowReference) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowReference); + long index = hash & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowReference, index)) { + return buffer.getValue(index); + } + // There's always an unused entry. + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowReference, index)) { + return buffer.getValue(index); + } + } + } + + public long put(long groupId, long rowId, long value) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowId); + + long index = find(groupId, rowId, hash); + if (index < 0) { + insert(twosComplement(index), groupId, rowId, hash, value); + return defaultReturnValue; + } + long oldValue = buffer.getValue(index); + buffer.setValue(index, value); + return oldValue; + } + + private long hash(long groupId, long rowId) { + return mix(groupId * 31 + strategy.hashCode(rowId)); + } + + private long hash(long groupId, RowReference rowReference) { + return mix(groupId * 31 + rowReference.hash(strategy)); + } + + private boolean equals(long groupId, long rowId, long index) { + return groupId == buffer.getGroupId(index) && strategy.equals(rowId, buffer.getRowId(index)); + } + + private boolean equals(long groupId, RowReference rowReference, long index) { + return groupId == buffer.getGroupId(index) + && rowReference.equals(strategy, buffer.getRowId(index)); + } + + private void insert(long index, long groupId, long rowId, long precomputedHash, long value) { + buffer.set(index, groupId, rowId, precomputedHash, value); + entryCount++; + if (entryCount > maxFill) { + rehash(bigArraySize(entryCount + 1, fillFactor)); + } + } + + /** + * Locate the index for the specified {@code groupId} and {@code rowId} key pair. If the index is + * unpopulated, then return the index as the two's complement value (which will be negative). + */ + private long find(long groupId, long rowId, long precomputedHash) { + long index = precomputedHash & mask; + if (buffer.isEmptySlot(index)) { + return twosComplement(index); + } + if (precomputedHash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return index; + } + // There's always an unused entry. + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return twosComplement(index); + } + if (precomputedHash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return index; + } + } + } + + public long remove(long groupId, long rowId) { + checkArgument(groupId != unmappedGroupId, "Group ID cannot be the unmapped group ID"); + + long hash = hash(groupId, rowId); + long index = hash & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return removeEntry(index); + } + while (true) { + index = (index + 1) & mask; + if (buffer.isEmptySlot(index)) { + return defaultReturnValue; + } + if (hash == buffer.getPrecomputedHash(index) && equals(groupId, rowId, index)) { + return removeEntry(index); + } + } + } + + private long removeEntry(long index) { + long oldValue = buffer.getValue(index); + entryCount--; + shiftKeys(index); + return oldValue; + } + + /** + * Shifts left entries with the specified hash code, starting at the specified index, and empties + * the resulting free entry. + * + * @param index a starting position. + */ + private void shiftKeys(long index) { + // Shift entries with the same hash. + while (true) { + long currentHash; + + long initialIndex = index; + index = ((index) + 1) & mask; + while (true) { + if (buffer.isEmptySlot(index)) { + buffer.clear(initialIndex); + return; + } + currentHash = buffer.getPrecomputedHash(index); + long slot = currentHash & mask; + // Yes, this is dense logic. See fastutil Long2LongOpenCustomHashMap#shiftKeys + // implementation. + if (initialIndex <= index + ? initialIndex >= slot || slot > index + : initialIndex >= slot && slot > index) { + break; + } + index = (index + 1) & mask; + } + buffer.set( + initialIndex, + buffer.getGroupId(index), + buffer.getRowId(index), + currentHash, + buffer.getValue(index)); + } + } + + private void rehash(long newTableSize) { + long newMask = newTableSize - 1; // Note that this is used by the hashing macro + Buffer newBuffer = new Buffer(newTableSize, unmappedGroupId); + long index = tableSize; + for (long i = entryCount; i > 0; i--) { + index--; + while (buffer.isEmptySlot(index)) { + index--; + } + long hash = buffer.getPrecomputedHash(index); + long newIndex = hash & newMask; + if (!newBuffer.isEmptySlot(newIndex)) { + newIndex = (newIndex + 1) & newMask; + while (!newBuffer.isEmptySlot(newIndex)) { + newIndex = (newIndex + 1) & newMask; + } + } + newBuffer.set( + newIndex, buffer.getGroupId(index), buffer.getRowId(index), hash, buffer.getValue(index)); + } + tableSize = newTableSize; + mask = newMask; + maxFill = maxFill(tableSize, fillFactor); + buffer = newBuffer; + } + + private static long twosComplement(long value) { + return -(value + 1); + } + + private static class Buffer { + private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(Buffer.class); + + private static final int POSITIONS_PER_ENTRY = 4; + private static final int ROW_ID_OFFSET = 1; + private static final int PRECOMPUTED_HASH_OFFSET = 2; + private static final int VALUE_OFFSET = 3; + + /* + * Memory layout: + * [LONG] groupId1, [LONG] rowId1, [LONG] precomputedHash1, [LONG] value1 + * [LONG] groupId2, [LONG] rowId2, [LONG] precomputedHash2, [LONG] value2 + * ... + */ + private final LongBigArray buffer; + private final long unmappedGroupId; + + public Buffer(long positions, long unmappedGroupId) { + buffer = new LongBigArray(unmappedGroupId); + buffer.ensureCapacity(positions * POSITIONS_PER_ENTRY); + this.unmappedGroupId = unmappedGroupId; + } + + public void set(long index, long groupId, long rowId, long precomputedHash, long value) { + buffer.set(index * POSITIONS_PER_ENTRY, groupId); + buffer.set(index * POSITIONS_PER_ENTRY + ROW_ID_OFFSET, rowId); + buffer.set(index * POSITIONS_PER_ENTRY + PRECOMPUTED_HASH_OFFSET, precomputedHash); + buffer.set(index * POSITIONS_PER_ENTRY + VALUE_OFFSET, value); + } + + public void clear(long index) { + // Since all fields of an index are set/unset together as a unit, we only need to choose one + // field to serve + // as a marker for empty slots. Group IDs have been arbitrarily selected for that purpose. + buffer.set(index * POSITIONS_PER_ENTRY, unmappedGroupId); + } + + public boolean isEmptySlot(long index) { + return getGroupId(index) == unmappedGroupId; + } + + public long getGroupId(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY); + } + + public long getRowId(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + ROW_ID_OFFSET); + } + + public long getPrecomputedHash(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + PRECOMPUTED_HASH_OFFSET); + } + + public long getValue(long index) { + return buffer.get(index * POSITIONS_PER_ENTRY + VALUE_OFFSET); + } + + public void setValue(long index, long value) { + buffer.set(index * POSITIONS_PER_ENTRY + VALUE_OFFSET, value); + } + + public long sizeOf() { + return INSTANCE_SIZE + buffer.sizeOf(); + } + } + + public static long maxFill(long n, float f) { + return Math.min((long) Math.ceil((double) ((float) n * f)), n - 1L); + } + + public static long nextPowerOfTwo(long x) { + return 1L << 64 - Long.numberOfLeadingZeros(x - 1L); Review Comment: The operator precedence in this expression is potentially problematic. The subtraction operator has higher precedence than the left shift operator, so this is evaluated as `1L << (64 - Long.numberOfLeadingZeros(x - 1L))`. While this is likely the intended behavior, it would be clearer and safer to add explicit parentheses: `1L << (64 - Long.numberOfLeadingZeros(x - 1L))`. Additionally, when x is 1, x-1 is 0, and Long.numberOfLeadingZeros(0) returns 64, resulting in `1L << (64 - 64)` = `1L << 0` = 1, which is correct. However, shifting by 64 or more bits can lead to undefined behavior in Java, so this should be handled carefully, though in practice the shift amount is masked to the lower 6 bits for long values. ```suggestion return 1L << (64 - Long.numberOfLeadingZeros(x - 1L)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
