This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-28813
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 5af5bf8dd07b7e7890d7aefd1b9a89933c824f5e
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Jun 24 14:19:33 2026 +0300

    IGNITE-28813 wip
---
 .../query/calcite/CalciteQueryProcessor.java       |  14 ++
 .../query/calcite/exec/exp/agg/Accumulators.java   |  87 ++++++-----
 .../exec/exp/agg/PluginAccumulatorsExtension.java  |  38 +++++
 .../AddAggregatFunctionViaPluginProviderTest.java  | 167 +++++++++++++++++++++
 .../ignite/testsuites/IntegrationTestSuite.java    |   2 +
 5 files changed, 271 insertions(+), 37 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index ca6fbd6f390..05ef25e19c2 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -82,6 +82,8 @@ import 
org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecuto
 import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
 import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
@@ -313,6 +315,8 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
         }
 
         distrCfg = new DistributedCalciteConfiguration(ctx, log);
+
+        extendAccumulatorsFromPlugins(ctx);
     }
 
     /**
@@ -853,4 +857,14 @@ public class CalciteQueryProcessor extends 
GridProcessorAdapter implements Query
     public InjectResourcesService injectService() {
         return injectSvc;
     }
+
+    /** */
+    private static void extendAccumulatorsFromPlugins(GridKernalContext ctx) {
+        PluginAccumulatorsExtension[] extensions = 
ctx.plugins().extensions(PluginAccumulatorsExtension.class);
+
+        if (!F.isEmpty(extensions)) {
+            for (PluginAccumulatorsExtension extension : extensions)
+                
Accumulators.addPluginAccumulatorFactories(extension.accumulatorFactories());
+        }
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
index 9c15e4a7a23..8af20faf6ff 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/Accumulators.java
@@ -24,8 +24,10 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.calcite.avatica.util.ByteString;
@@ -37,6 +39,7 @@ import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.type.SqlTypeName;
 import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
 import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension.PluginAccumulatorFactory;
 import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
@@ -54,6 +57,9 @@ import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
  *
  */
 public class Accumulators {
+    /** */
+    private static final Map<String, PluginAccumulatorFactory<?>> 
PLUGIN_FACTORY_BY_NAME = new ConcurrentHashMap<>();
+
     /** */
     public static <Row> Supplier<Accumulator<Row>> 
accumulatorFactory(AggregateCall call, ExecutionContext<Row> ctx) {
         Supplier<Accumulator<Row>> supplier = accumulatorFunctionFactory(call, 
ctx);
@@ -71,38 +77,29 @@ public class Accumulators {
     ) {
         RowHandler<Row> hnd = ctx.rowHandler();
 
-        switch (call.getAggregation().getName()) {
-            case "COUNT":
-                return () -> new LongCount<>(call, hnd);
-            case "AVG":
-                return avgFactory(call, hnd);
-            case "SUM":
-                return sumFactory(call, hnd);
-            case "$SUM0":
-                return sumEmptyIsZeroFactory(call, hnd);
-            case "MIN":
-            case "EVERY":
-                return minFactory(call, hnd);
-            case "MAX":
-            case "SOME":
-                return maxFactory(call, hnd);
-            case "SINGLE_VALUE":
-                return () -> new SingleVal<>(call, hnd);
-            case "LITERAL_AGG":
-                return () -> new LiteralVal<>(call, hnd);
-            case "ANY_VALUE":
-                return () -> new AnyVal<>(call, hnd);
-            case "LISTAGG":
-            case "ARRAY_AGG":
-            case "ARRAY_CONCAT_AGG":
-                return listAggregateSupplier(call, ctx);
-            case "BIT_AND":
-            case "BIT_OR":
-            case "BIT_XOR":
-                return bitWiseFactory(call, hnd);
-            default:
-                throw new AssertionError(call.getAggregation().getName());
-        }
+        String aggFunName = call.getAggregation().getName();
+
+        return switch (aggFunName) {
+            case "COUNT" -> () -> new LongCount<>(call, hnd);
+            case "AVG" -> avgFactory(call, hnd);
+            case "SUM" -> sumFactory(call, hnd);
+            case "$SUM0" -> sumEmptyIsZeroFactory(call, hnd);
+            case "MIN", "EVERY" -> minFactory(call, hnd);
+            case "MAX", "SOME" -> maxFactory(call, hnd);
+            case "SINGLE_VALUE" -> () -> new SingleVal<>(call, hnd);
+            case "LITERAL_AGG" -> () -> new LiteralVal<>(call, hnd);
+            case "ANY_VALUE" -> () -> new AnyVal<>(call, hnd);
+            case "LISTAGG", "ARRAY_AGG", "ARRAY_CONCAT_AGG" -> 
listAggregateSupplier(call, ctx);
+            case "BIT_AND", "BIT_OR", "BIT_XOR" -> bitWiseFactory(call, hnd);
+            default -> {
+                PluginAccumulatorFactory<Row> factory = 
(PluginAccumulatorFactory<Row>) PLUGIN_FACTORY_BY_NAME.get(aggFunName);
+
+                if (factory == null)
+                    throw new AssertionError("Accumulator factory not found 
for: " + aggFunName);
+
+                yield () -> factory.create(call, ctx);
+            }
+        };
     }
 
     /** */
@@ -280,7 +277,7 @@ public class Accumulators {
     }
 
     /** */
-    private abstract static class AbstractAccumulator<Row> implements 
Accumulator<Row> {
+    public abstract static class AbstractAccumulator<Row> implements 
Accumulator<Row> {
         /** */
         private final RowHandler<Row> hnd;
 
@@ -288,13 +285,13 @@ public class Accumulators {
         private final transient AggregateCall aggCall;
 
         /** */
-        AbstractAccumulator(AggregateCall aggCall, RowHandler<Row> hnd) {
+        protected AbstractAccumulator(AggregateCall aggCall, RowHandler<Row> 
hnd) {
             this.aggCall = aggCall;
             this.hnd = hnd;
         }
 
         /** */
-        <T> T get(int idx, Row row) {
+        protected <T> T get(int idx, Row row) {
             assert idx < arguments().size() : "idx=" + idx + "; arguments=" + 
arguments();
 
             return (T)hnd.get(arguments().get(idx), row);
@@ -311,7 +308,7 @@ public class Accumulators {
         }
 
         /** */
-        int columnCount(Row row) {
+        protected int columnCount(Row row) {
             return hnd.columnCount(row);
         }
     }
@@ -1344,8 +1341,9 @@ public class Accumulators {
                 if (builder == null)
                     builder = new StringBuilder();
 
-                if (builder.length() != 0)
+                if (!builder.isEmpty())
                     builder.append(extractSeparator(row));
+
                 builder.append(val);
             }
 
@@ -1510,4 +1508,19 @@ public class Accumulators {
             return acc.returnType(typeFactory);
         }
     }
+
+    /** */
+    public static void addPluginAccumulatorFactories(Map<String, 
PluginAccumulatorFactory<?>> factoryByAggFunName) {
+        for (Map.Entry<String, PluginAccumulatorFactory<?>> e : 
factoryByAggFunName.entrySet()) {
+            String aggFunName = e.getKey().trim().toUpperCase(Locale.ROOT);
+
+            if (aggFunName.isBlank())
+                throw new AssertionError("Invalid aggregate function name: " + 
aggFunName);
+
+            PluginAccumulatorFactory<?> prev = 
PLUGIN_FACTORY_BY_NAME.putIfAbsent(aggFunName, e.getValue());
+
+//            if (prev != null)
+//                throw new AssertionError("Duplicate aggregate function name: 
" + aggFunName);
+        }
+    }
 }
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java
new file mode 100644
index 00000000000..aa80d7f11c7
--- /dev/null
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/exp/agg/PluginAccumulatorsExtension.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;
+
+import java.util.Map;
+import org.apache.calcite.rel.core.AggregateCall;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
+import org.apache.ignite.plugin.Extension;
+import org.apache.ignite.plugin.PluginProvider;
+
+/** Class for extending {@link Accumulators} via {@link PluginProvider 
plugins}. */
+@FunctionalInterface
+public interface PluginAccumulatorsExtension extends Extension {
+    /** @return Accumulator factories by aggregate function name. Name must be 
non-empty and unique. */
+    Map<String, PluginAccumulatorFactory<?>> accumulatorFactories();
+
+    /** */
+    @FunctionalInterface
+    interface PluginAccumulatorFactory<Row> {
+        /** */
+        Accumulator<Row> create(AggregateCall call, ExecutionContext<Row> ctx);
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java
 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java
new file mode 100644
index 00000000000..b53ee861189
--- /dev/null
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/AddAggregatFunctionViaPluginProviderTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.calcite.integration;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.util.ReflectiveSqlOperatorTable;
+import org.apache.calcite.sql.util.SqlOperatorTables;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.util.Optionality;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor;
+import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulator;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.Accumulators.AbstractAccumulator;
+import 
org.apache.ignite.internal.processors.query.calcite.exec.exp.agg.PluginAccumulatorsExtension;
+import 
org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.jspecify.annotations.Nullable;
+import org.junit.Test;
+
+/** Test for adding aggregat function via {@link PluginProvider}. */
+public class AddAggregatFunctionViaPluginProviderTest extends 
AbstractBasicIntegrationTest {
+    /** */
+    private static final String TEST_SUM_FUN_NAME = "TEST_SUM";
+
+    /** {@inheritDoc} */
+    @Override protected int nodeCount() {
+        return 1;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setPluginProviders(new TestPluginProvider());
+    }
+
+    /** */
+    @Test
+    public void test() {
+        assertQuery("SELECT TEST_SUM(x) FROM (VALUES (1), (2), (3)) t(x)")
+            .returns(6L)
+            .check();
+    }
+
+    /** */
+    private static class TestPluginProvider extends AbstractTestPluginProvider 
{
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return getClass().getName();
+        }
+
+        /** {@inheritDoc} */
+        @Override public @Nullable <T> T createComponent(PluginContext ctx, 
Class<T> cls) {
+            if (!FrameworkConfig.class.equals(cls))
+                return null;
+
+            FrameworkConfig cfg = CalciteQueryProcessor.FRAMEWORK_CONFIG;
+
+            return (T) Frameworks.newConfigBuilder(cfg)
+                .operatorTable(SqlOperatorTables.chain(
+                    new TestSqlOperatorTable().init(), cfg.getOperatorTable()
+                ))
+                .build();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void initExtensions(PluginContext ctx, 
ExtensionRegistry registry) {
+            registry.registerExtension(
+                PluginAccumulatorsExtension.class,
+                () -> Map.of(TEST_SUM_FUN_NAME, (call, ctx1) -> new 
TestSum<>(call, ctx1.rowHandler()))
+            );
+        }
+    }
+
+    /** */
+    public static class TestSqlSumAggFunction extends SqlAggFunction {
+        /** */
+        public TestSqlSumAggFunction() {
+            super(
+                TEST_SUM_FUN_NAME,
+                null,
+                SqlKind.SUM,
+                ReturnTypes.AGG_SUM,
+                null,
+                OperandTypes.NUMERIC,
+                SqlFunctionCategory.NUMERIC,
+                false,
+                false,
+                Optionality.FORBIDDEN
+            );
+        }
+    }
+
+    /** */
+    public static class TestSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
+        /** */
+        @SuppressWarnings("unused")
+        public static final SqlAggFunction TEST_SUM = new 
TestSqlSumAggFunction();
+    }
+
+    /** */
+    private static class TestSum<Row> extends AbstractAccumulator<Row> {
+        /** */
+        private long sum;
+
+        /** */
+        protected TestSum(AggregateCall aggCall, RowHandler<Row> hnd) {
+            super(aggCall, hnd);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void add(Row row) {
+            Number val = get(0, row);
+
+            if (val != null)
+                sum += val.longValue();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(Accumulator<Row> other) {
+            sum += ((TestSum<Row>)other).sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object end() {
+            return sum;
+        }
+
+        /** {@inheritDoc} */
+        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+            return 
List.of(typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.BIGINT),
 true));
+        }
+
+        /** {@inheritDoc} */
+        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+            return 
typeFactory.createSqlType(org.apache.calcite.sql.type.SqlTypeName.BIGINT);
+        }
+    }
+}
diff --git 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
index f4e123c6a7c..177e07c04a7 100644
--- 
a/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
+++ 
b/modules/calcite/src/test/java/org/apache/ignite/testsuites/IntegrationTestSuite.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor
 import org.apache.ignite.internal.processors.query.calcite.CancelTest;
 import 
org.apache.ignite.internal.processors.query.calcite.IndexWithSameNameCalciteTest;
 import 
org.apache.ignite.internal.processors.query.calcite.SqlFieldsQueryUsageTest;
+import 
org.apache.ignite.internal.processors.query.calcite.integration.AddAggregatFunctionViaPluginProviderTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.AggregatesIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.AuthorizationIntegrationTest;
 import 
org.apache.ignite.internal.processors.query.calcite.integration.CacheStoreTest;
@@ -183,6 +184,7 @@ import org.junit.runners.Suite;
     CacheWithInterceptorIntegrationTest.class,
     TxThreadLockingTest.class,
     SelectByKeyFieldTest.class,
+    AddAggregatFunctionViaPluginProviderTest.class,
 })
 public class IntegrationTestSuite {
 }

Reply via email to