This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e0248cc9a74 IGNITE-25909 SQL Calcite: Optimize memory consumption for
hash-table based operators - Fixes #12189.
e0248cc9a74 is described below
commit e0248cc9a74f6633eee7dd8a5e765d4ae1f70631
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Jul 25 16:05:18 2025 +0500
IGNITE-25909 SQL Calcite: Optimize memory consumption for hash-table based
operators - Fixes #12189.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../benchmarks/jmh/sql/JmhSqlBenchmark.java | 51 ++++++++++
.../query/calcite/exec/MappingRowHandler.java | 63 ++++++++++++
.../query/calcite/exec/RuntimeHashIndex.java | 48 ++++-----
.../query/calcite/exec/exp/agg/GroupKey.java | 110 ++++++++++++---------
.../query/calcite/exec/rel/AbstractSetOpNode.java | 29 +++---
.../query/calcite/exec/rel/HashAggregateNode.java | 74 ++++++--------
.../query/calcite/exec/rel/IntersectNode.java | 2 +-
.../query/calcite/exec/rel/MinusNode.java | 2 +-
.../calcite/exec/tracker/ObjectSizeCalculator.java | 2 +-
.../query/calcite/rel/set/IgniteMapSetOp.java | 3 +-
.../query/calcite/exec/rel/BaseAggregateTest.java | 3 +-
.../rel/HashAggregateSingleGroupExecutionTest.java | 3 +-
parent/pom.xml | 2 +-
13 files changed, 245 insertions(+), 147 deletions(-)
diff --git
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
index 78c0589f1d2..0698fb93327 100644
---
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
+++
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
@@ -251,6 +251,57 @@ public class JmhSqlBenchmark {
throw new AssertionError("Unexpected result: " + res.get(0));
}
+ /**
+ * Query with EXCEPT set op.
+ */
+ @Benchmark
+ public void queryExcept() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql(
+ "SELECT fld, fldIdx, fldBatch, fldIdxBatch FROM Item WHERE
fldIdxBatch=? " +
+ "EXCEPT " +
+ "SELECT fld + ?, fldIdx + ?, fldBatch, fldIdxBatch FROM Item
WHERE fldIdxBatch=?",
+ key / BATCH_SIZE, BATCH_SIZE / 2, BATCH_SIZE / 2, key /
BATCH_SIZE);
+
+ if (res.size() != BATCH_SIZE / 2)
+ throw new AssertionError("Unexpected result size: " + res.size());
+ }
+
+ /**
+ * Query with INTERSECT set op.
+ */
+ @Benchmark
+ public void queryIntersect() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql(
+ "SELECT fld, fldIdx, fldBatch, fldIdxBatch FROM Item WHERE
fldIdxBatch=? " +
+ "INTERSECT " +
+ "SELECT fld + ?, fldIdx + ?, fldBatch, fldIdxBatch FROM Item
WHERE fldIdxBatch=?",
+ key / BATCH_SIZE, BATCH_SIZE / 2, BATCH_SIZE / 2, key /
BATCH_SIZE);
+
+ if (res.size() != BATCH_SIZE / 2)
+ throw new AssertionError("Unexpected result size: " + res.size());
+ }
+
+ /**
+ * Query with correlated subquery.
+ */
+ @Benchmark
+ public void queryCorrelated() {
+ int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+ List<?> res = executeSql(
+ "SELECT fld FROM Item i0 WHERE fldIdxBatch=? AND EXISTS " +
+ "(SELECT 1 FROM Item i1 WHERE i0.fld = i1.fld + ? AND
i0.fldBatch = i1.fldBatch)",
+ key / BATCH_SIZE, BATCH_SIZE / 2);
+
+ // Skip result check for H2 engine, because query can't be executed
correctly on H2.
+ if (!"H2".equals(engine) && res.size() != BATCH_SIZE / 2)
+ throw new AssertionError("Unexpected result size: " + res.size());
+ }
+
/** */
private List<List<?>> executeSql(String sql, Object... args) {
return cache.query(new SqlFieldsQuery(sql).setArgs(args)).getAll();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java
new file mode 100644
index 00000000000..6d5a9b3f61b
--- /dev/null
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.lang.reflect.Type;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/**
+ * Read only handler to process subset of columns.
+ */
+public class MappingRowHandler<Row> implements RowHandler<Row> {
+ /** */
+ private final RowHandler<Row> delegate;
+
+ /** */
+ private final int[] mapping;
+
+ /** */
+ public MappingRowHandler(RowHandler<Row> delegate, ImmutableBitSet
requiredColumns) {
+ this.delegate = delegate;
+ mapping = requiredColumns.toArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object get(int field, Row row) {
+ return delegate.get(mapping[field], row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int columnCount(Row row) {
+ return mapping.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void set(int field, Row row, Object val) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row concat(Row left, Row right) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public RowFactory<Row> factory(Type... types) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
index 82973c64a37..0e670e1da5f 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
@@ -25,27 +25,21 @@ import java.util.function.Supplier;
import org.apache.calcite.util.ImmutableBitSet;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Runtime hash index based on on-heap hash map.
*/
public class RuntimeHashIndex<Row> implements RuntimeIndex<Row> {
- /**
- * Placeholder for keys containing NULL values. Used to skip rows with
such keys, since condition NULL=NULL
- * should not satisfy the filter.
- */
- private static final GroupKey NULL_KEY = new
GroupKey(X.EMPTY_OBJECT_ARRAY);
-
/** */
protected final ExecutionContext<Row> ectx;
/** */
- private final ImmutableBitSet keys;
+ private final RowHandler<Row> keysRowHnd;
/** Rows. */
- private final HashMap<GroupKey, List<Row>> rows;
+ private final HashMap<GroupKey<Row>, List<Row>> rows;
/** Allow NULL values. */
private final boolean allowNulls;
@@ -59,19 +53,19 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
boolean allowNulls
) {
this.ectx = ectx;
+ this.allowNulls = allowNulls;
assert !F.isEmpty(keys);
- this.keys = keys;
- this.allowNulls = allowNulls;
+ keysRowHnd = new MappingRowHandler<>(ectx.rowHandler(), keys);
rows = new HashMap<>();
}
/** {@inheritDoc} */
@Override public void push(Row r) {
- GroupKey key = key(r);
+ GroupKey<Row> key = key(r);
- if (key == NULL_KEY)
+ if (key == null)
return;
List<Row> eqRows = rows.computeIfAbsent(key, k -> new ArrayList<>());
@@ -89,20 +83,20 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
return new IndexScan(searchRow);
}
- /** */
- private GroupKey key(Row r) {
- GroupKey.Builder b = GroupKey.builder(keys.cardinality());
-
- for (Integer field : keys) {
- Object fieldVal = ectx.rowHandler().get(field, r);
-
- if (fieldVal == null && !allowNulls)
- return NULL_KEY;
-
- b.add(fieldVal);
+ /**
+ * @return Group key for provided row. Can be {@code null} if key fields
of row contain NULL values.
+ * Since condition NULL=NULL in SQL should not satisfy the filter (but
nulls are allowed for
+ * IS NOT DISTINCT FROM condition).
+ */
+ private @Nullable GroupKey<Row> key(Row r) {
+ if (!allowNulls) {
+ for (int i = 0; i < keysRowHnd.columnCount(r); i++) {
+ if (keysRowHnd.get(i, r) == null)
+ return null;
+ }
}
- return b.build();
+ return new GroupKey<>(r, keysRowHnd);
}
/**
@@ -121,9 +115,9 @@ public class RuntimeHashIndex<Row> implements
RuntimeIndex<Row> {
/** {@inheritDoc} */
@NotNull @Override public Iterator<Row> iterator() {
- GroupKey key = key(searchRow.get());
+ GroupKey<Row> key = key(searchRow.get());
- if (key == NULL_KEY)
+ if (key == null)
return Collections.emptyIterator();
List<Row> eqRows = rows.get(key);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
index fbf9265ba4c..42fb13507b9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
@@ -17,86 +17,98 @@
package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;
-import java.util.Arrays;
-import org.apache.ignite.internal.util.typedef.X;
+import java.util.Objects;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
/**
*
*/
-public class GroupKey {
+public class GroupKey<Row> implements Binarylizable {
/** */
- public static final GroupKey EMPTY_GRP_KEY = new
GroupKey(X.EMPTY_OBJECT_ARRAY);
+ private Row row;
/** */
- private final Object[] fields;
+ private RowHandler<Row> hnd;
/** */
- public GroupKey(Object[] fields) {
- this.fields = fields;
+ public GroupKey(Row row, RowHandler<Row> hnd) {
+ this.row = row;
+ this.hnd = hnd;
}
/** */
- public Object[] fields() {
- return fields;
+ public Row row() {
+ return row;
+ }
+
+ /** */
+ public RowHandler<Row> rowHandler() {
+ return hnd;
}
/** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
+ @Override public void writeBinary(BinaryWriter writer) throws
BinaryObjectException {
+ BinaryRawWriter rawWriter = writer.rawWriter();
- GroupKey grpKey = (GroupKey)o;
+ int colCnt = hnd.columnCount(row);
- return Arrays.equals(fields, grpKey.fields);
+ rawWriter.writeInt(colCnt);
+
+ for (int i = 0; i < colCnt; i++)
+ rawWriter.writeObject(hnd.get(i, row));
}
/** {@inheritDoc} */
- @Override public int hashCode() {
- return Arrays.hashCode(fields);
- }
+ @Override public void readBinary(BinaryReader reader) throws
BinaryObjectException {
+ BinaryRawReader rawReader = reader.rawReader();
- /** */
- public static Builder builder(int rowLen) {
- return new Builder(rowLen);
- }
+ int colCnt = rawReader.readInt();
- /** */
- public static class Builder {
- /** */
- private final Object[] fields;
+ Object[] row0 = new Object[colCnt];
- /** */
- private int idx;
+ for (int i = 0; i < colCnt; i++)
+ row0[i] = rawReader.readObject();
- /** */
- private Builder(int rowLen) {
- fields = new Object[rowLen];
- }
+ row = (Row)row0;
+ hnd = (RowHandler<Row>)ArrayRowHandler.INSTANCE;
+ }
- /** */
- public Builder add(Object val) {
- if (idx == fields.length)
- throw new IndexOutOfBoundsException();
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
- fields[idx++] = val;
+ GroupKey<Row> other = (GroupKey<Row>)o;
- return this;
- }
+ int colCnt = hnd.columnCount(row);
- /** */
- public GroupKey build() {
- assert idx == fields.length;
+ if (colCnt != other.hnd.columnCount(other.row))
+ return false;
- return new GroupKey(fields);
+ for (int i = 0; i < colCnt; i++) {
+ if (!Objects.equals(hnd.get(i, row), other.hnd.get(i, other.row)))
+ return false;
}
- /** */
- public void clear() {
- Arrays.fill(fields, null);
+ return true;
+ }
- idx = 0;
- }
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int hashCode = 0;
+
+ for (int i = 0; i < hnd.columnCount(row); i++)
+ hashCode = hashCode * 31 + Objects.hashCode(hnd.get(i, row));
+
+ return hashCode;
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
index 9c5fbafb103..9b2f6fcd277 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
@@ -206,7 +206,7 @@ public abstract class AbstractSetOpNode<Row> extends
MemoryTrackingNode<Row> {
/** */
protected abstract static class Grouping<Row> {
/** */
- protected final Map<GroupKey, int[]> groups = new HashMap<>();
+ protected final Map<GroupKey<Row>, int[]> groups = new HashMap<>();
/** */
protected final RowHandler<Row> hnd;
@@ -261,15 +261,8 @@ public abstract class AbstractSetOpNode<Row> extends
MemoryTrackingNode<Row> {
}
/** */
- protected GroupKey key(Row row) {
- int size = hnd.columnCount(row);
-
- Object[] fields = new Object[size];
-
- for (int i = 0; i < size; i++)
- fields[i] = hnd.get(i, row);
-
- return new GroupKey(fields);
+ protected GroupKey<Row> key(Row row) {
+ return new GroupKey<>(row, hnd);
}
/** */
@@ -285,7 +278,7 @@ public abstract class AbstractSetOpNode<Row> extends
MemoryTrackingNode<Row> {
/** */
protected void addOnReducer(Row row) {
- GroupKey grpKey = (GroupKey)hnd.get(0, row);
+ GroupKey<Row> grpKey = key((Row)hnd.get(0, row));
int[] cntrsMap = (int[])hnd.get(1, row);
int[] cntrs = groups.computeIfAbsent(grpKey, k -> new
int[cntrsMap.length]);
@@ -298,17 +291,17 @@ public abstract class AbstractSetOpNode<Row> extends
MemoryTrackingNode<Row> {
/** */
protected List<Row> getOnMapper(int cnt) {
- Iterator<Map.Entry<GroupKey, int[]>> it =
groups.entrySet().iterator();
+ Iterator<Map.Entry<GroupKey<Row>, int[]>> it =
groups.entrySet().iterator();
int amount = Math.min(cnt, groups.size());
List<Row> res = new ArrayList<>(amount);
while (amount > 0 && it.hasNext()) {
- Map.Entry<GroupKey, int[]> entry = it.next();
+ Map.Entry<GroupKey<Row>, int[]> entry = it.next();
// Skip row if it doesn't affect the final result.
if (affectResult(entry.getValue())) {
- res.add(rowFactory.create(entry.getKey(),
entry.getValue()));
+ res.add(rowFactory.create(entry.getKey().row(),
entry.getValue()));
amount--;
}
@@ -321,16 +314,16 @@ public abstract class AbstractSetOpNode<Row> extends
MemoryTrackingNode<Row> {
/** */
protected List<Row> getOnSingleOrReducer(int cnt) {
- Iterator<Map.Entry<GroupKey, int[]>> it =
groups.entrySet().iterator();
+ Iterator<Map.Entry<GroupKey<Row>, int[]>> it =
groups.entrySet().iterator();
List<Row> res = new ArrayList<>(cnt);
while (it.hasNext() && cnt > 0) {
- Map.Entry<GroupKey, int[]> entry = it.next();
+ Map.Entry<GroupKey<Row>, int[]> entry = it.next();
- GroupKey key = entry.getKey();
+ GroupKey<Row> key = entry.getKey();
- Row row = rowFactory.create(key.fields());
+ Row row = key.row();
int[] cntrs = entry.getValue();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
index 39c8a1f74db..62985188cad 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
@@ -24,13 +24,13 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.IgniteException;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import
org.apache.ignite.internal.processors.query.calcite.exec.MappingRowHandler;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
import
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
@@ -241,52 +241,36 @@ public class HashAggregateNode<Row> extends
AggregateNode<Row> {
private final ImmutableBitSet grpFields;
/** */
- private final Map<GroupKey, List<AccumulatorWrapper<Row>>> groups =
new HashMap<>();
+ private final Map<GroupKey<Row>, List<AccumulatorWrapper<Row>>> groups
= new HashMap<>();
/** */
- private final RowHandler<Row> handler;
+ private final RowHandler<Row> hnd;
/** */
- private GroupKey.Builder grpKeyBld;
+ private final RowHandler<Row> keyGrpRowHnd;
/** */
- private final BiFunction<GroupKey, List<AccumulatorWrapper<Row>>,
List<AccumulatorWrapper<Row>>> getOrCreateGroup;
-
- /** */
- private final Function<GroupKey, List<AccumulatorWrapper<Row>>>
createGroup;
+ private final Function<GroupKey<Row>, List<AccumulatorWrapper<Row>>>
createGrp;
/** */
private Grouping(byte grpId, ImmutableBitSet grpFields) {
this.grpId = grpId;
this.grpFields = grpFields;
- grpKeyBld = GroupKey.builder(grpFields.cardinality());
- handler = context().rowHandler();
-
- createGroup = (k) -> create();
-
- getOrCreateGroup = (k, v) -> {
- if (v == null) {
- grpKeyBld = GroupKey.builder(grpFields.cardinality());
-
- return create();
- }
- else {
- grpKeyBld.clear();
+ hnd = context().rowHandler();
+ keyGrpRowHnd = new MappingRowHandler<>(hnd, grpFields);
- return v;
- }
- };
+ createGrp = k -> create();
init();
}
/** */
private void init() {
- // Initializes aggregates for case when no any rows will be added
into the aggregate to have 0 as result.
- // Doesn't do it for MAP type due to we don't want send from MAP
node zero results because it looks redundant.
+ // Initializes aggregates so they return 0 even if no rows are
added. However, this initialization
+ // doesn't apply to MAP types since sending zero results from MAP
nodes looks redundant
if (grpFields.isEmpty() && (type == AggregateType.REDUCE || type
== AggregateType.SINGLE))
- groups.put(GroupKey.EMPTY_GRP_KEY, create());
+ groups.put(key(rowFactory.create()), create());
}
/** */
@@ -319,11 +303,13 @@ public class HashAggregateNode<Row> extends
AggregateNode<Row> {
}
/** */
- private void addOnMapper(Row row) {
- for (Integer field : grpFields)
- grpKeyBld.add(handler.get(field, row));
+ private GroupKey<Row> key(Row row) {
+ return new GroupKey<>(row, keyGrpRowHnd);
+ }
- List<AccumulatorWrapper<Row>> wrappers =
groups.compute(grpKeyBld.build(), getOrCreateGroup);
+ /** */
+ private void addOnMapper(Row row) {
+ List<AccumulatorWrapper<Row>> wrappers =
groups.computeIfAbsent(key(row), createGrp);
for (AccumulatorWrapper<Row> wrapper : wrappers)
wrapper.add(row);
@@ -331,15 +317,15 @@ public class HashAggregateNode<Row> extends
AggregateNode<Row> {
/** */
private void addOnReducer(Row row) {
- byte targetGrpId = (byte)handler.get(0, row);
+ byte targetGrpId = (byte)hnd.get(0, row);
if (targetGrpId != grpId)
return;
- GroupKey grpKey = (GroupKey)handler.get(1, row);
+ GroupKey<Row> grpKey = (GroupKey<Row>)hnd.get(1, row);
- List<AccumulatorWrapper<Row>> wrappers =
groups.computeIfAbsent(grpKey, createGroup);
- Accumulator<Row>[] accums = hasAccumulators() ?
(Accumulator<Row>[])handler.get(2, row) : null;
+ List<AccumulatorWrapper<Row>> wrappers =
groups.computeIfAbsent(grpKey, createGrp);
+ Accumulator<Row>[] accums = hasAccumulators() ?
(Accumulator<Row>[])hnd.get(2, row) : null;
for (int i = 0; i < wrappers.size(); i++) {
AccumulatorWrapper<Row> wrapper = wrappers.get(i);
@@ -351,15 +337,15 @@ public class HashAggregateNode<Row> extends
AggregateNode<Row> {
/** */
private List<Row> getOnMapper(int cnt) {
- Iterator<Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>>> it =
groups.entrySet().iterator();
+ Iterator<Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>>>
it = groups.entrySet().iterator();
int amount = Math.min(cnt, groups.size());
List<Row> res = new ArrayList<>(amount);
for (int i = 0; i < amount; i++) {
- Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>> entry =
it.next();
+ Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>> entry
= it.next();
- GroupKey grpKey = entry.getKey();
+ GroupKey<Row> grpKey = entry.getKey();
if (hasAccumulators()) {
List<AccumulatorWrapper<Row>> wrappers = entry.getValue();
Accumulator<Row>[] accums = new
Accumulator[wrappers.size()];
@@ -380,25 +366,23 @@ public class HashAggregateNode<Row> extends
AggregateNode<Row> {
/** */
private List<Row> getOnReducer(int cnt) {
- Iterator<Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>>> it =
groups.entrySet().iterator();
+ Iterator<Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>>>
it = groups.entrySet().iterator();
int amount = Math.min(cnt, groups.size());
List<Row> res = new ArrayList<>(amount);
for (int i = 0; i < amount; i++) {
- Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>> entry =
it.next();
+ Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>> entry
= it.next();
- GroupKey grpKey = entry.getKey();
+ GroupKey<Row> grpKey = entry.getKey();
List<AccumulatorWrapper<Row>> wrappers = entry.getValue();
Object[] fields = new Object[grpSet.cardinality() +
wrappers.size()];
int j = 0, k = 0;
- Object[] keyFields = grpKey.fields();
-
- for (Integer field : grpSet)
- fields[j++] = grpFields.get(field) ? keyFields[k++] : null;
+ for (int field = grpSet.nextSetBit(0); field >= 0; field =
grpSet.nextSetBit(field + 1))
+ fields[j++] = grpFields.get(field) ?
grpKey.rowHandler().get(k++, grpKey.row()) : null;
for (AccumulatorWrapper<Row> wrapper : wrappers)
fields[j++] = wrapper.end();
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
index cbeb7cdfc4c..1b4aa08a6b1 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
@@ -58,7 +58,7 @@ public class IntersectNode<Row> extends
AbstractSetOpNode<Row> {
@Override protected void addOnSingle(Row row, int setIdx) {
int[] cntrs;
- GroupKey key = key(row);
+ GroupKey<Row> key = key(row);
if (setIdx == 0) {
cntrs = groups.computeIfAbsent(key, k -> new int[inputsCnt]);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
index d289141324e..a989b050008 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
@@ -44,7 +44,7 @@ public class MinusNode<Row> extends AbstractSetOpNode<Row> {
@Override protected void addOnSingle(Row row, int setIdx) {
int[] cntrs;
- GroupKey key = key(row);
+ GroupKey<Row> key = key(row);
if (setIdx == 0) {
// Value in the map will always have 2 elements, first - count
of keys in the first set,
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
index 4937bc64f69..1486a1ed862 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
@@ -104,7 +104,7 @@ public class ObjectSizeCalculator<Row> {
addSysClsSize(cls, (c, bo) -> szFunc.applyAsInt(bo)));
// Other.
- addSysClsSize(GroupKey.class, (c, k) -> c.sizeOf0(k.fields(), true));
+ addSysClsSize(GroupKey.class, (c, k) -> c.sizeOf0(k.row(), true));
addSysClsSize(UUID.class, null);
addSysClsSize(BigDecimal.class, (c, bd) ->
c.sizeOf0(bd.unscaledValue(), true));
long intArrOffset = GridUnsafe.arrayBaseOffset(int[].class);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
index f38ec09b1b5..d840f4ad0e2 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.util.Pair;
import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
-import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
import
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
import
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
import
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
@@ -94,7 +93,7 @@ public interface IgniteMapSetOp extends IgniteSetOp {
RelDataTypeFactory.Builder builder = new
RelDataTypeFactory.Builder(typeFactory);
- builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
+ builder.add("ROW", getInput(0).getRowType());
builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
return builder.build();
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
index 1552708e2cc..b0bfe89e976 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
@@ -43,6 +43,7 @@ import
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Aggregat
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Assert;
@@ -666,7 +667,7 @@ public abstract class BaseAggregateTest extends
AbstractExecutionTest {
/** */
@Override public Object[] create() {
- throw new AssertionError();
+ return X.EMPTY_OBJECT_ARRAY;
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
index 2098910ffd3..1b4ee59c5bb 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -38,6 +38,7 @@ import
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHash
import
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Before;
import org.junit.Test;
@@ -518,7 +519,7 @@ public class HashAggregateSingleGroupExecutionTest extends
AbstractExecutionTest
/** */
@Override public Object[] create() {
- throw new AssertionError();
+ return X.EMPTY_OBJECT_ARRAY;
}
/** */
diff --git a/parent/pom.xml b/parent/pom.xml
index 1d7cc72c2ee..74f1c052c1d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -86,7 +86,7 @@
<jetbrains.annotations.version>16.0.3</jetbrains.annotations.version>
<jetty.version>11.0.24</jetty.version>
<jetty-jakarta-servlet-api.version>5.0.2</jetty-jakarta-servlet-api.version>
- <jmh.version>1.36</jmh.version>
+ <jmh.version>1.37</jmh.version>
<jna.version>4.5.2</jna.version>
<jnr.posix.version>3.1.15</jnr.posix.version>
<jotm.version>2.3.1-M1</jotm.version>