This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e0248cc9a74 IGNITE-25909 SQL Calcite: Optimize memory consumption for 
hash-table based operators - Fixes #12189.
e0248cc9a74 is described below

commit e0248cc9a74f6633eee7dd8a5e765d4ae1f70631
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Fri Jul 25 16:05:18 2025 +0500

    IGNITE-25909 SQL Calcite: Optimize memory consumption for hash-table based 
operators - Fixes #12189.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../benchmarks/jmh/sql/JmhSqlBenchmark.java        |  51 ++++++++++
 .../query/calcite/exec/MappingRowHandler.java      |  63 ++++++++++++
 .../query/calcite/exec/RuntimeHashIndex.java       |  48 ++++-----
 .../query/calcite/exec/exp/agg/GroupKey.java       | 110 ++++++++++++---------
 .../query/calcite/exec/rel/AbstractSetOpNode.java  |  29 +++---
 .../query/calcite/exec/rel/HashAggregateNode.java  |  74 ++++++--------
 .../query/calcite/exec/rel/IntersectNode.java      |   2 +-
 .../query/calcite/exec/rel/MinusNode.java          |   2 +-
 .../calcite/exec/tracker/ObjectSizeCalculator.java |   2 +-
 .../query/calcite/rel/set/IgniteMapSetOp.java      |   3 +-
 .../query/calcite/exec/rel/BaseAggregateTest.java  |   3 +-
 .../rel/HashAggregateSingleGroupExecutionTest.java |   3 +-
 parent/pom.xml                                     |   2 +-
 13 files changed, 245 insertions(+), 147 deletions(-)

diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
index 78c0589f1d2..0698fb93327 100644
--- 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/sql/JmhSqlBenchmark.java
@@ -251,6 +251,57 @@ public class JmhSqlBenchmark {
             throw new AssertionError("Unexpected result: " + res.get(0));
     }
 
+    /**
+     * Query with EXCEPT set op.
+     */
+    @Benchmark
+    public void queryExcept() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql(
+            "SELECT fld, fldIdx, fldBatch, fldIdxBatch FROM Item WHERE 
fldIdxBatch=? " +
+                "EXCEPT " +
+                "SELECT fld + ?, fldIdx + ?, fldBatch, fldIdxBatch FROM Item 
WHERE fldIdxBatch=?",
+            key / BATCH_SIZE, BATCH_SIZE / 2, BATCH_SIZE / 2, key / 
BATCH_SIZE);
+
+        if (res.size() != BATCH_SIZE / 2)
+            throw new AssertionError("Unexpected result size: " + res.size());
+    }
+
+    /**
+     * Query with INTERSECT set op.
+     */
+    @Benchmark
+    public void queryIntersect() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql(
+            "SELECT fld, fldIdx, fldBatch, fldIdxBatch FROM Item WHERE 
fldIdxBatch=? " +
+                "INTERSECT " +
+                "SELECT fld + ?, fldIdx + ?, fldBatch, fldIdxBatch FROM Item 
WHERE fldIdxBatch=?",
+            key / BATCH_SIZE, BATCH_SIZE / 2, BATCH_SIZE / 2, key / 
BATCH_SIZE);
+
+        if (res.size() != BATCH_SIZE / 2)
+            throw new AssertionError("Unexpected result size: " + res.size());
+    }
+
+    /**
+     * Query with correlated subquery.
+     */
+    @Benchmark
+    public void queryCorrelated() {
+        int key = ThreadLocalRandom.current().nextInt(KEYS_CNT);
+
+        List<?> res = executeSql(
+            "SELECT fld FROM Item i0 WHERE fldIdxBatch=? AND EXISTS " +
+                "(SELECT 1 FROM Item i1 WHERE i0.fld = i1.fld + ? AND 
i0.fldBatch = i1.fldBatch)",
+            key / BATCH_SIZE, BATCH_SIZE / 2);
+
+        // Skip result check for H2 engine, because query can't be executed 
correctly on H2.
+        if (!"H2".equals(engine) && res.size() != BATCH_SIZE / 2)
+            throw new AssertionError("Unexpected result size: " + res.size());
+    }
+
     /** */
     private List<List<?>> executeSql(String sql, Object... args) {
         return cache.query(new SqlFieldsQuery(sql).setArgs(args)).getAll();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java
new file mode 100644
index 00000000000..6d5a9b3f61b
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/MappingRowHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec;
+
+import java.lang.reflect.Type;
+import org.apache.calcite.util.ImmutableBitSet;
+
+/**
+ * Read only handler to process subset of columns.
+ */
+public class MappingRowHandler<Row> implements RowHandler<Row> {
+    /** */
+    private final RowHandler<Row> delegate;
+
+    /** */
+    private final int[] mapping;
+
+    /** */
+    public MappingRowHandler(RowHandler<Row> delegate, ImmutableBitSet 
requiredColumns) {
+        this.delegate = delegate;
+        mapping = requiredColumns.toArray();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object get(int field, Row row) {
+        return delegate.get(mapping[field], row);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int columnCount(Row row) {
+        return mapping.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void set(int field, Row row, Object val) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Row concat(Row left, Row right) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public RowFactory<Row> factory(Type... types) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
index 82973c64a37..0e670e1da5f 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/RuntimeHashIndex.java
@@ -25,27 +25,21 @@ import java.util.function.Supplier;
 import org.apache.calcite.util.ImmutableBitSet;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Runtime hash index based on on-heap hash map.
  */
 public class RuntimeHashIndex<Row> implements RuntimeIndex<Row> {
-    /**
-     * Placeholder for keys containing NULL values. Used to skip rows with 
such keys, since condition NULL=NULL
-     * should not satisfy the filter.
-     */
-    private static final GroupKey NULL_KEY = new 
GroupKey(X.EMPTY_OBJECT_ARRAY);
-
     /** */
     protected final ExecutionContext<Row> ectx;
 
     /** */
-    private final ImmutableBitSet keys;
+    private final RowHandler<Row> keysRowHnd;
 
     /** Rows. */
-    private final HashMap<GroupKey, List<Row>> rows;
+    private final HashMap<GroupKey<Row>, List<Row>> rows;
 
     /** Allow NULL values. */
     private final boolean allowNulls;
@@ -59,19 +53,19 @@ public class RuntimeHashIndex<Row> implements 
RuntimeIndex<Row> {
         boolean allowNulls
     ) {
         this.ectx = ectx;
+        this.allowNulls = allowNulls;
 
         assert !F.isEmpty(keys);
 
-        this.keys = keys;
-        this.allowNulls = allowNulls;
+        keysRowHnd = new MappingRowHandler<>(ectx.rowHandler(), keys);
         rows = new HashMap<>();
     }
 
     /** {@inheritDoc} */
     @Override public void push(Row r) {
-        GroupKey key = key(r);
+        GroupKey<Row> key = key(r);
 
-        if (key == NULL_KEY)
+        if (key == null)
             return;
 
         List<Row> eqRows = rows.computeIfAbsent(key, k -> new ArrayList<>());
@@ -89,20 +83,20 @@ public class RuntimeHashIndex<Row> implements 
RuntimeIndex<Row> {
         return new IndexScan(searchRow);
     }
 
-    /** */
-    private GroupKey key(Row r) {
-        GroupKey.Builder b = GroupKey.builder(keys.cardinality());
-
-        for (Integer field : keys) {
-            Object fieldVal = ectx.rowHandler().get(field, r);
-
-            if (fieldVal == null && !allowNulls)
-                return NULL_KEY;
-
-            b.add(fieldVal);
+    /**
+     * @return Group key for provided row. Can be {@code null} if key fields 
of row contain NULL values.
+     * Since condition NULL=NULL in SQL should not satisfy the filter (but 
nulls are allowed for
+     * IS NOT DISTINCT FROM condition).
+     */
+    private @Nullable GroupKey<Row> key(Row r) {
+        if (!allowNulls) {
+            for (int i = 0; i < keysRowHnd.columnCount(r); i++) {
+                if (keysRowHnd.get(i, r) == null)
+                    return null;
+            }
         }
 
-        return b.build();
+        return new GroupKey<>(r, keysRowHnd);
     }
 
     /**
@@ -121,9 +115,9 @@ public class RuntimeHashIndex<Row> implements 
RuntimeIndex<Row> {
 
         /** {@inheritDoc} */
         @NotNull @Override public Iterator<Row> iterator() {
-            GroupKey key = key(searchRow.get());
+            GroupKey<Row> key = key(searchRow.get());
 
-            if (key == NULL_KEY)
+            if (key == null)
                 return Collections.emptyIterator();
 
             List<Row> eqRows = rows.get(key);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
index fbf9265ba4c..42fb13507b9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/GroupKey.java
@@ -17,86 +17,98 @@
 
 package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;
 
-import java.util.Arrays;
-import org.apache.ignite.internal.util.typedef.X;
+import java.util.Objects;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ArrayRowHandler;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 
 /**
  *
  */
-public class GroupKey {
+public class GroupKey<Row> implements Binarylizable {
     /** */
-    public static final GroupKey EMPTY_GRP_KEY = new 
GroupKey(X.EMPTY_OBJECT_ARRAY);
+    private Row row;
 
     /** */
-    private final Object[] fields;
+    private RowHandler<Row> hnd;
 
     /** */
-    public GroupKey(Object[] fields) {
-        this.fields = fields;
+    public GroupKey(Row row, RowHandler<Row> hnd) {
+        this.row = row;
+        this.hnd = hnd;
     }
 
     /** */
-    public Object[] fields() {
-        return fields;
+    public Row row() {
+        return row;
+    }
+
+    /** */
+    public RowHandler<Row> rowHandler() {
+        return hnd;
     }
 
     /** {@inheritDoc} */
-    @Override public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
+    @Override public void writeBinary(BinaryWriter writer) throws 
BinaryObjectException {
+        BinaryRawWriter rawWriter = writer.rawWriter();
 
-        GroupKey grpKey = (GroupKey)o;
+        int colCnt = hnd.columnCount(row);
 
-        return Arrays.equals(fields, grpKey.fields);
+        rawWriter.writeInt(colCnt);
+
+        for (int i = 0; i < colCnt; i++)
+            rawWriter.writeObject(hnd.get(i, row));
     }
 
     /** {@inheritDoc} */
-    @Override public int hashCode() {
-        return Arrays.hashCode(fields);
-    }
+    @Override public void readBinary(BinaryReader reader) throws 
BinaryObjectException {
+        BinaryRawReader rawReader = reader.rawReader();
 
-    /** */
-    public static Builder builder(int rowLen) {
-        return new Builder(rowLen);
-    }
+        int colCnt = rawReader.readInt();
 
-    /** */
-    public static class Builder {
-        /** */
-        private final Object[] fields;
+        Object[] row0 = new Object[colCnt];
 
-        /** */
-        private int idx;
+        for (int i = 0; i < colCnt; i++)
+            row0[i] = rawReader.readObject();
 
-        /** */
-        private Builder(int rowLen) {
-            fields = new Object[rowLen];
-        }
+        row = (Row)row0;
+        hnd = (RowHandler<Row>)ArrayRowHandler.INSTANCE;
+    }
 
-        /** */
-        public Builder add(Object val) {
-            if (idx == fields.length)
-                throw new IndexOutOfBoundsException();
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
 
-            fields[idx++] = val;
+        GroupKey<Row> other = (GroupKey<Row>)o;
 
-            return this;
-        }
+        int colCnt = hnd.columnCount(row);
 
-        /** */
-        public GroupKey build() {
-            assert idx == fields.length;
+        if (colCnt != other.hnd.columnCount(other.row))
+            return false;
 
-            return new GroupKey(fields);
+        for (int i = 0; i < colCnt; i++) {
+            if (!Objects.equals(hnd.get(i, row), other.hnd.get(i, other.row)))
+                return false;
         }
 
-        /** */
-        public void clear() {
-            Arrays.fill(fields, null);
+        return true;
+    }
 
-            idx = 0;
-        }
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int hashCode = 0;
+
+        for (int i = 0; i < hnd.columnCount(row); i++)
+            hashCode = hashCode * 31 + Objects.hashCode(hnd.get(i, row));
+
+        return hashCode;
     }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
index 9c5fbafb103..9b2f6fcd277 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/AbstractSetOpNode.java
@@ -206,7 +206,7 @@ public abstract class AbstractSetOpNode<Row> extends 
MemoryTrackingNode<Row> {
     /** */
     protected abstract static class Grouping<Row> {
         /** */
-        protected final Map<GroupKey, int[]> groups = new HashMap<>();
+        protected final Map<GroupKey<Row>, int[]> groups = new HashMap<>();
 
         /** */
         protected final RowHandler<Row> hnd;
@@ -261,15 +261,8 @@ public abstract class AbstractSetOpNode<Row> extends 
MemoryTrackingNode<Row> {
         }
 
         /** */
-        protected GroupKey key(Row row) {
-            int size = hnd.columnCount(row);
-
-            Object[] fields = new Object[size];
-
-            for (int i = 0; i < size; i++)
-                fields[i] = hnd.get(i, row);
-
-            return new GroupKey(fields);
+        protected GroupKey<Row> key(Row row) {
+            return new GroupKey<>(row, hnd);
         }
 
         /** */
@@ -285,7 +278,7 @@ public abstract class AbstractSetOpNode<Row> extends 
MemoryTrackingNode<Row> {
 
         /** */
         protected void addOnReducer(Row row) {
-            GroupKey grpKey = (GroupKey)hnd.get(0, row);
+            GroupKey<Row> grpKey = key((Row)hnd.get(0, row));
             int[] cntrsMap = (int[])hnd.get(1, row);
 
             int[] cntrs = groups.computeIfAbsent(grpKey, k -> new 
int[cntrsMap.length]);
@@ -298,17 +291,17 @@ public abstract class AbstractSetOpNode<Row> extends 
MemoryTrackingNode<Row> {
 
         /** */
         protected List<Row> getOnMapper(int cnt) {
-            Iterator<Map.Entry<GroupKey, int[]>> it = 
groups.entrySet().iterator();
+            Iterator<Map.Entry<GroupKey<Row>, int[]>> it = 
groups.entrySet().iterator();
 
             int amount = Math.min(cnt, groups.size());
             List<Row> res = new ArrayList<>(amount);
 
             while (amount > 0 && it.hasNext()) {
-                Map.Entry<GroupKey, int[]> entry = it.next();
+                Map.Entry<GroupKey<Row>, int[]> entry = it.next();
 
                 // Skip row if it doesn't affect the final result.
                 if (affectResult(entry.getValue())) {
-                    res.add(rowFactory.create(entry.getKey(), 
entry.getValue()));
+                    res.add(rowFactory.create(entry.getKey().row(), 
entry.getValue()));
 
                     amount--;
                 }
@@ -321,16 +314,16 @@ public abstract class AbstractSetOpNode<Row> extends 
MemoryTrackingNode<Row> {
 
         /** */
         protected List<Row> getOnSingleOrReducer(int cnt) {
-            Iterator<Map.Entry<GroupKey, int[]>> it = 
groups.entrySet().iterator();
+            Iterator<Map.Entry<GroupKey<Row>, int[]>> it = 
groups.entrySet().iterator();
 
             List<Row> res = new ArrayList<>(cnt);
 
             while (it.hasNext() && cnt > 0) {
-                Map.Entry<GroupKey, int[]> entry = it.next();
+                Map.Entry<GroupKey<Row>, int[]> entry = it.next();
 
-                GroupKey key = entry.getKey();
+                GroupKey<Row> key = entry.getKey();
 
-                Row row = rowFactory.create(key.fields());
+                Row row = key.row();
 
                 int[] cntrs = entry.getValue();
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
index 39c8a1f74db..62985188cad 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateNode.java
@@ -24,13 +24,13 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.IgniteException;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.MappingRowHandler;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
@@ -241,52 +241,36 @@ public class HashAggregateNode<Row> extends 
AggregateNode<Row> {
         private final ImmutableBitSet grpFields;
 
         /** */
-        private final Map<GroupKey, List<AccumulatorWrapper<Row>>> groups = 
new HashMap<>();
+        private final Map<GroupKey<Row>, List<AccumulatorWrapper<Row>>> groups 
= new HashMap<>();
 
         /** */
-        private final RowHandler<Row> handler;
+        private final RowHandler<Row> hnd;
 
         /** */
-        private GroupKey.Builder grpKeyBld;
+        private final RowHandler<Row> keyGrpRowHnd;
 
         /** */
-        private final BiFunction<GroupKey, List<AccumulatorWrapper<Row>>, 
List<AccumulatorWrapper<Row>>> getOrCreateGroup;
-
-        /** */
-        private final Function<GroupKey, List<AccumulatorWrapper<Row>>> 
createGroup;
+        private final Function<GroupKey<Row>, List<AccumulatorWrapper<Row>>> 
createGrp;
 
         /** */
         private Grouping(byte grpId, ImmutableBitSet grpFields) {
             this.grpId = grpId;
             this.grpFields = grpFields;
 
-            grpKeyBld = GroupKey.builder(grpFields.cardinality());
-            handler = context().rowHandler();
-
-            createGroup = (k) -> create();
-
-            getOrCreateGroup = (k, v) -> {
-                if (v == null) {
-                    grpKeyBld = GroupKey.builder(grpFields.cardinality());
-
-                    return create();
-                }
-                else {
-                    grpKeyBld.clear();
+            hnd = context().rowHandler();
+            keyGrpRowHnd = new MappingRowHandler<>(hnd, grpFields);
 
-                    return v;
-                }
-            };
+            createGrp = k -> create();
 
             init();
         }
 
         /** */
         private void init() {
-            // Initializes aggregates for case when no any rows will be added 
into the aggregate to have 0 as result.
-            // Doesn't do it for MAP type due to we don't want send from MAP 
node zero results because it looks redundant.
+            // Initializes aggregates so they return 0 even if no rows are 
added. However, this initialization
+            // doesn't apply to MAP types since sending zero results from MAP 
nodes looks redundant
             if (grpFields.isEmpty() && (type == AggregateType.REDUCE || type 
== AggregateType.SINGLE))
-                groups.put(GroupKey.EMPTY_GRP_KEY, create());
+                groups.put(key(rowFactory.create()), create());
         }
 
         /** */
@@ -319,11 +303,13 @@ public class HashAggregateNode<Row> extends 
AggregateNode<Row> {
         }
 
         /** */
-        private void addOnMapper(Row row) {
-            for (Integer field : grpFields)
-                grpKeyBld.add(handler.get(field, row));
+        private GroupKey<Row> key(Row row) {
+            return new GroupKey<>(row, keyGrpRowHnd);
+        }
 
-            List<AccumulatorWrapper<Row>> wrappers = 
groups.compute(grpKeyBld.build(), getOrCreateGroup);
+        /** */
+        private void addOnMapper(Row row) {
+            List<AccumulatorWrapper<Row>> wrappers = 
groups.computeIfAbsent(key(row), createGrp);
 
             for (AccumulatorWrapper<Row> wrapper : wrappers)
                 wrapper.add(row);
@@ -331,15 +317,15 @@ public class HashAggregateNode<Row> extends 
AggregateNode<Row> {
 
         /** */
         private void addOnReducer(Row row) {
-            byte targetGrpId = (byte)handler.get(0, row);
+            byte targetGrpId = (byte)hnd.get(0, row);
 
             if (targetGrpId != grpId)
                 return;
 
-            GroupKey grpKey = (GroupKey)handler.get(1, row);
+            GroupKey<Row> grpKey = (GroupKey<Row>)hnd.get(1, row);
 
-            List<AccumulatorWrapper<Row>> wrappers = 
groups.computeIfAbsent(grpKey, createGroup);
-            Accumulator<Row>[] accums = hasAccumulators() ? 
(Accumulator<Row>[])handler.get(2, row) : null;
+            List<AccumulatorWrapper<Row>> wrappers = 
groups.computeIfAbsent(grpKey, createGrp);
+            Accumulator<Row>[] accums = hasAccumulators() ? 
(Accumulator<Row>[])hnd.get(2, row) : null;
 
             for (int i = 0; i < wrappers.size(); i++) {
                 AccumulatorWrapper<Row> wrapper = wrappers.get(i);
@@ -351,15 +337,15 @@ public class HashAggregateNode<Row> extends 
AggregateNode<Row> {
 
         /** */
         private List<Row> getOnMapper(int cnt) {
-            Iterator<Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>>> it = 
groups.entrySet().iterator();
+            Iterator<Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>>> 
it = groups.entrySet().iterator();
 
             int amount = Math.min(cnt, groups.size());
             List<Row> res = new ArrayList<>(amount);
 
             for (int i = 0; i < amount; i++) {
-                Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>> entry = 
it.next();
+                Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>> entry 
= it.next();
 
-                GroupKey grpKey = entry.getKey();
+                GroupKey<Row> grpKey = entry.getKey();
                 if (hasAccumulators()) {
                     List<AccumulatorWrapper<Row>> wrappers = entry.getValue();
                     Accumulator<Row>[] accums = new 
Accumulator[wrappers.size()];
@@ -380,25 +366,23 @@ public class HashAggregateNode<Row> extends 
AggregateNode<Row> {
 
         /** */
         private List<Row> getOnReducer(int cnt) {
-            Iterator<Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>>> it = 
groups.entrySet().iterator();
+            Iterator<Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>>> 
it = groups.entrySet().iterator();
 
             int amount = Math.min(cnt, groups.size());
             List<Row> res = new ArrayList<>(amount);
 
             for (int i = 0; i < amount; i++) {
-                Map.Entry<GroupKey, List<AccumulatorWrapper<Row>>> entry = 
it.next();
+                Map.Entry<GroupKey<Row>, List<AccumulatorWrapper<Row>>> entry 
= it.next();
 
-                GroupKey grpKey = entry.getKey();
+                GroupKey<Row> grpKey = entry.getKey();
                 List<AccumulatorWrapper<Row>> wrappers = entry.getValue();
 
                 Object[] fields = new Object[grpSet.cardinality() + 
wrappers.size()];
 
                 int j = 0, k = 0;
 
-                Object[] keyFields = grpKey.fields();
-
-                for (Integer field : grpSet)
-                    fields[j++] = grpFields.get(field) ? keyFields[k++] : null;
+                for (int field = grpSet.nextSetBit(0); field >= 0; field = 
grpSet.nextSetBit(field + 1))
+                    fields[j++] = grpFields.get(field) ? 
grpKey.rowHandler().get(k++, grpKey.row()) : null;
 
                 for (AccumulatorWrapper<Row> wrapper : wrappers)
                     fields[j++] = wrapper.end();
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
index cbeb7cdfc4c..1b4aa08a6b1 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/IntersectNode.java
@@ -58,7 +58,7 @@ public class IntersectNode<Row> extends 
AbstractSetOpNode<Row> {
         @Override protected void addOnSingle(Row row, int setIdx) {
             int[] cntrs;
 
-            GroupKey key = key(row);
+            GroupKey<Row> key = key(row);
 
             if (setIdx == 0) {
                 cntrs = groups.computeIfAbsent(key, k -> new int[inputsCnt]);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
index d289141324e..a989b050008 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/MinusNode.java
@@ -44,7 +44,7 @@ public class MinusNode<Row> extends AbstractSetOpNode<Row> {
         @Override protected void addOnSingle(Row row, int setIdx) {
             int[] cntrs;
 
-            GroupKey key = key(row);
+            GroupKey<Row> key = key(row);
 
             if (setIdx == 0) {
                 // Value in the map will always have 2 elements, first - count 
of keys in the first set,
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
index 4937bc64f69..1486a1ed862 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/tracker/ObjectSizeCalculator.java
@@ -104,7 +104,7 @@ public class ObjectSizeCalculator<Row> {
             addSysClsSize(cls, (c, bo) -> szFunc.applyAsInt(bo)));
 
         // Other.
-        addSysClsSize(GroupKey.class, (c, k) -> c.sizeOf0(k.fields(), true));
+        addSysClsSize(GroupKey.class, (c, k) -> c.sizeOf0(k.row(), true));
         addSysClsSize(UUID.class, null);
         addSysClsSize(BigDecimal.class, (c, bd) -> 
c.sizeOf0(bd.unscaledValue(), true));
         long intArrOffset = GridUnsafe.arrayBaseOffset(int[].class);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
index f38ec09b1b5..d840f4ad0e2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/rel/set/IgniteMapSetOp.java
@@ -27,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.util.Pair;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.AggregateType;
-import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.GroupKey;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.CorrelationTrait;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.IgniteDistributions;
 import 
org.apache.ignite.internal.processors.query.calcite.trait.RewindabilityTrait;
@@ -94,7 +93,7 @@ public interface IgniteMapSetOp extends IgniteSetOp {
 
         RelDataTypeFactory.Builder builder = new 
RelDataTypeFactory.Builder(typeFactory);
 
-        builder.add("GROUP_KEY", typeFactory.createJavaType(GroupKey.class));
+        builder.add("ROW", getInput(0).getRowType());
         builder.add("COUNTERS", typeFactory.createJavaType(int[].class));
 
         return builder.build();
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
index 1552708e2cc..b0bfe89e976 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/BaseAggregateTest.java
@@ -43,6 +43,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Aggregat
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Assert;
@@ -666,7 +667,7 @@ public abstract class BaseAggregateTest extends 
AbstractExecutionTest {
 
             /** */
             @Override public Object[] create() {
-                throw new AssertionError();
+                return X.EMPTY_OBJECT_ARRAY;
             }
 
             /** */
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
index 2098910ffd3..1b4ee59c5bb 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/HashAggregateSingleGroupExecutionTest.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.rel.agg.IgniteMapHash
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.junit.Before;
 import org.junit.Test;
@@ -518,7 +519,7 @@ public class HashAggregateSingleGroupExecutionTest extends 
AbstractExecutionTest
 
             /** */
             @Override public Object[] create() {
-                throw new AssertionError();
+                return X.EMPTY_OBJECT_ARRAY;
             }
 
             /** */
diff --git a/parent/pom.xml b/parent/pom.xml
index 1d7cc72c2ee..74f1c052c1d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -86,7 +86,7 @@
         <jetbrains.annotations.version>16.0.3</jetbrains.annotations.version>
         <jetty.version>11.0.24</jetty.version>
         
<jetty-jakarta-servlet-api.version>5.0.2</jetty-jakarta-servlet-api.version>
-        <jmh.version>1.36</jmh.version>
+        <jmh.version>1.37</jmh.version>
         <jna.version>4.5.2</jna.version>
         <jnr.posix.version>3.1.15</jnr.posix.version>
         <jotm.version>2.3.1-M1</jotm.version>


Reply via email to