This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c0bad1282 [GLUTEN-2163][CH] support aggregate function 
approx_percentile  (#4829)
c0bad1282 is described below

commit c0bad12820961c5f3650af4c247d0fe1f270c919
Author: 李扬 <654010...@qq.com>
AuthorDate: Fri Mar 29 14:11:26 2024 +0800

    [GLUTEN-2163][CH] support aggregate function approx_percentile  (#4829)
    
    [CH] support aggregate function approx_percentile
---
 .../execution/CHHashAggregateExecTransformer.scala |   8 ++
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala |  13 ++
 .../backendsapi/velox/VeloxBackend.scala           |   8 +-
 .../Operator/DefaultHashAggregateResult.h          |   2 +-
 .../Parser/AggregateFunctionParser.cpp             |  14 +--
 .../local-engine/Parser/AggregateFunctionParser.h  |   9 +-
 cpp-ch/local-engine/Parser/AggregateRelParser.cpp  |  42 +++++--
 .../local-engine/Parser/SerializedPlanParser.cpp   |  16 ++-
 cpp-ch/local-engine/Parser/TypeParser.cpp          |  27 ++--
 cpp-ch/local-engine/Parser/TypeParser.h            |   3 +-
 cpp-ch/local-engine/Parser/WindowRelParser.cpp     |   7 +-
 .../ApproxPercentileParser.cpp                     | 140 +++++++++++++++++++++
 .../ApproxPercentileParser.h                       |  48 +++++++
 .../BloomFilterAggParser.h                         |   4 +-
 .../aggregate_function_parser/CollectListParser.h  |   8 +-
 .../CommonAggregateFunctionParser.h                |   2 +-
 .../aggregate_function_parser/CountParser.cpp      |   7 +-
 .../Parser/aggregate_function_parser/CountParser.h |   2 +-
 .../aggregate_function_parser/LeadLagParser.h      |   4 +-
 .../local-engine/Storages/SourceFromJavaIter.cpp   |  78 ++++++++----
 cpp-ch/local-engine/Storages/SourceFromJavaIter.h  |  11 +-
 .../Storages/SubstraitSource/ReadBufferBuilder.cpp |  27 ++--
 .../expression/ExpressionMappings.scala            |   3 +-
 .../utils/clickhouse/ClickHouseTestSettings.scala  |   1 +
 .../utils/clickhouse/ClickHouseTestSettings.scala  |   1 +
 .../utils/clickhouse/ClickHouseTestSettings.scala  |   1 +
 .../glutenproject/expression/ExpressionNames.scala |   1 +
 27 files changed, 381 insertions(+), 106 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/io/glutenproject/execution/CHHashAggregateExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/io/glutenproject/execution/CHHashAggregateExecTransformer.scala
index 13d02ffa7..8a312c456 100644
--- 
a/backends-clickhouse/src/main/scala/io/glutenproject/execution/CHHashAggregateExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/io/glutenproject/execution/CHHashAggregateExecTransformer.scala
@@ -365,6 +365,14 @@ case class CHHashAggregateExecTransformer(
                 fields = fields :+ (child.dataType, child.nullable)
               }
               (makeStructType(fields), false)
+            case approxPercentile: ApproximatePercentile =>
+              var fields = Seq[(DataType, Boolean)]()
+              // Use approxPercentile.nullable as the nullable of the struct 
type
+              // to make sure it returns null when input is empty
+              fields = fields :+ (approxPercentile.child.dataType, 
approxPercentile.nullable)
+              fields = fields :+ 
(approxPercentile.percentageExpression.dataType,
+              approxPercentile.percentageExpression.nullable)
+              (makeStructType(fields), attr.nullable)
             case _ =>
               (makeStructTypeSingleOne(attr.dataType, attr.nullable), 
attr.nullable)
           }
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 5c9d31a01..098b0d9e2 100644
--- 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -2465,6 +2465,19 @@ class GlutenClickHouseTPCHSaltNullParquetSuite extends 
GlutenClickHouseTPCHAbstr
     spark.sql("drop table test_tbl_4997")
   }
 
+  test("aggregate function approx_percentile") {
+    // single percentage
+    val sql1 = "select l_linenumber % 10, approx_percentile(l_extendedprice, 
0.5) " +
+      "from lineitem group by l_linenumber % 10"
+    runQueryAndCompare(sql1)({ _ => })
+
+    // multiple percentages
+    val sql2 =
+      "select l_linenumber % 10, approx_percentile(l_extendedprice, array(0.1, 
0.2, 0.3)) " +
+        "from lineitem group by l_linenumber % 10"
+    runQueryAndCompare(sql2)({ _ => })
+  }
+
   test("GLUTEN-5096: Bug fix regexp_extract diff") {
     val tbl_create_sql = "create table test_tbl_5096(id bigint, data string) 
using parquet"
     val tbl_insert_sql = "insert into test_tbl_5096 values(1, 'abc'), (2, 
'abc\n')"
diff --git 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
index 25d389f3c..e27b8084e 100644
--- 
a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala
@@ -28,7 +28,7 @@ import 
io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFor
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, 
Descending, Expression, Lag, Lead, Literal, MakeYMInterval, NamedExpression, 
NthValue, NTile, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, 
SpecialFrameBoundary, SpecifiedWindowFrame, Uuid}
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Count, Sum}
+import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
ApproximatePercentile, Count, Sum}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
@@ -335,8 +335,10 @@ object BackendSettings extends BackendSettingsApi {
             case _ =>
           }
           windowExpression.windowFunction match {
-            case _: RowNumber | _: AggregateExpression | _: Rank | _: CumeDist 
| _: DenseRank |
-                _: PercentRank | _: NthValue | _: NTile | _: Lag | _: Lead =>
+            case _: RowNumber | _: Rank | _: CumeDist | _: DenseRank | _: 
PercentRank |
+                _: NthValue | _: NTile | _: Lag | _: Lead =>
+            case aggrExpr: AggregateExpression
+                if 
!aggrExpr.aggregateFunction.isInstanceOf[ApproximatePercentile] =>
             case _ =>
               allSupported = false
           }
diff --git a/cpp-ch/local-engine/Operator/DefaultHashAggregateResult.h 
b/cpp-ch/local-engine/Operator/DefaultHashAggregateResult.h
index 5729a4770..b433f4072 100644
--- a/cpp-ch/local-engine/Operator/DefaultHashAggregateResult.h
+++ b/cpp-ch/local-engine/Operator/DefaultHashAggregateResult.h
@@ -22,7 +22,7 @@
 namespace local_engine
 {
 
-/// Special case: goruping keys is empty, and there is no input from 
updstream, but still need to return one default row.
+/// Special case: goruping keys is empty, and there is no input from upstream, 
but still need to return one default row.
 class DefaultHashAggregateResultStep : public DB::ITransformingStep
 {
 public:
diff --git a/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp 
b/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
index 708a16914..56cd58ad9 100644
--- a/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
+++ b/cpp-ch/local-engine/Parser/AggregateFunctionParser.cpp
@@ -148,19 +148,11 @@ const DB::ActionsDAG::Node * 
AggregateFunctionParser::convertNodeTypeIfNeeded(
     const CommonFunctionInfo & func_info,
     const DB::ActionsDAG::Node * func_node,
     DB::ActionsDAGPtr & actions_dag,
-    bool withNullability) const
+    bool with_nullability) const
 {
     const auto & output_type = func_info.output_type;
-    bool needToConvertNodeType = false;
-    if (withNullability)
-    {
-        needToConvertNodeType = 
!TypeParser::isTypeMatchedWithNullability(output_type, func_node->result_type);
-    }
-    else
-    {
-        needToConvertNodeType = !TypeParser::isTypeMatched(output_type, 
func_node->result_type);
-    }
-    if (needToConvertNodeType)
+    bool need_convert_type = !TypeParser::isTypeMatched(output_type, 
func_node->result_type, !with_nullability);
+    if (need_convert_type)
     {
         func_node = ActionsDAGUtil::convertNodeType(
             actions_dag, func_node, 
TypeParser::parseType(output_type)->getName(), func_node->result_name);
diff --git a/cpp-ch/local-engine/Parser/AggregateFunctionParser.h 
b/cpp-ch/local-engine/Parser/AggregateFunctionParser.h
index e2444361f..464ad099a 100644
--- a/cpp-ch/local-engine/Parser/AggregateFunctionParser.h
+++ b/cpp-ch/local-engine/Parser/AggregateFunctionParser.h
@@ -89,9 +89,11 @@ public:
     /// In some special cases, different arguments size or different arguments 
types may refer to different
     /// CH function implementation.
     virtual String getCHFunctionName(const CommonFunctionInfo & func_info) 
const = 0;
+
     /// In most cases, arguments size and types are enough to determine the CH 
function implementation.
-    /// This is only be used in SerializedPlanParser::parseNameStructure.
-    virtual String getCHFunctionName(const DB::DataTypes & args) const = 0;
+    /// It is only be used in TypeParser::buildBlockFromNamedStruct
+    /// Users are allowed to modify arg types to make it fit for 
ggregateFunctionFactory::instance().get(...) in 
TypeParser::buildBlockFromNamedStruct
+    virtual String getCHFunctionName(DB::DataTypes & args) const = 0;
 
     /// Do some preprojections for the function arguments, and return the 
necessary arguments for the CH function.
     virtual DB::ActionsDAG::NodeRawConstPtrs
@@ -112,7 +114,8 @@ public:
     virtual const DB::ActionsDAG::Node * convertNodeTypeIfNeeded(
         const CommonFunctionInfo & func_info,
         const DB::ActionsDAG::Node * func_node,
-        DB::ActionsDAGPtr & actions_dag, bool withNullability) const;
+        DB::ActionsDAGPtr & actions_dag,
+        bool with_nullability) const;
 
     /// Parameters are only used in aggregate functions at present. e.g. 
percentiles(0.5)(x).
     /// 0.5 is the parameter of percentiles function.
diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp 
b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp
index a3ab329f0..d20f30e41 100644
--- a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp
@@ -54,12 +54,14 @@ AggregateRelParser::AggregateRelParser(SerializedPlanParser 
* plan_paser_) : Rel
 DB::QueryPlanPtr AggregateRelParser::parse(DB::QueryPlanPtr query_plan, const 
substrait::Rel & rel, std::list<const substrait::Rel *> &)
 {
     setup(std::move(query_plan), rel);
+
     addPreProjection();
     LOG_TRACE(logger, "header after pre-projection is: {}", 
plan->getCurrentDataStream().header.dumpStructure());
     if (has_final_stage)
     {
         addMergingAggregatedStep();
         LOG_TRACE(logger, "header after merging is: {}", 
plan->getCurrentDataStream().header.dumpStructure());
+
         addPostProjection();
         LOG_TRACE(logger, "header after post-projection is: {}", 
plan->getCurrentDataStream().header.dumpStructure());
     }
@@ -67,6 +69,7 @@ DB::QueryPlanPtr AggregateRelParser::parse(DB::QueryPlanPtr 
query_plan, const su
     {
         addCompleteModeAggregatedStep();
         LOG_TRACE(logger, "header after complete aggregate is: {}", 
plan->getCurrentDataStream().header.dumpStructure());
+
         addPostProjection();
         LOG_TRACE(logger, "header after post-projection is: {}", 
plan->getCurrentDataStream().header.dumpStructure());
     }
@@ -184,6 +187,8 @@ void AggregateRelParser::addPreProjection()
     }
     if (projection_action->dumpDAG() != dag_footprint)
     {
+        /// Avoid unnecessary evaluation
+        projection_action->removeUnusedActions();
         auto projection_step = 
std::make_unique<DB::ExpressionStep>(plan->getCurrentDataStream(), 
projection_action);
         projection_step->setStepDescription("Projection before aggregate");
         steps.emplace_back(projection_step.get());
@@ -193,22 +198,41 @@ void AggregateRelParser::addPreProjection()
 
 void AggregateRelParser::buildAggregateDescriptions(AggregateDescriptions & 
descriptions)
 {
-    auto build_result_column_name = [](const String & function_name, const 
Strings & arg_column_names, substrait::AggregationPhase phase)
+    auto build_result_column_name = [](const String & function_name, const 
Array & params, const Strings & arg_names, substrait::AggregationPhase phase)
     {
         if (phase == 
substrait::AggregationPhase::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT)
         {
-            assert(arg_column_names.size() == 1);
-            return arg_column_names[0];
+            assert(arg_names.size() == 1);
+            return arg_names[0];
+        }
+
+        String result = function_name;
+        if (!params.empty())
+        {
+            result += "(";
+            for (size_t i = 0; i < params.size(); ++i)
+            {
+                if (i != 0)
+                    result += ",";
+                result += toString(params[i]);
+            }
+            result += ")";
         }
-        String arg_list_str = boost::algorithm::join(arg_column_names, ",");
-        return function_name + "(" + arg_list_str + ")";
+
+        result += "(";
+        result += boost::algorithm::join(arg_names, ",");
+        result += ")";
+        return result;
     };
+
     for (auto & agg_info : aggregates)
     {
         AggregateDescription description;
         const auto & measure = agg_info.measure->measure();
-        description.column_name = 
build_result_column_name(agg_info.function_name, agg_info.arg_column_names, 
measure.phase());
+        description.column_name
+            = build_result_column_name(agg_info.function_name, 
agg_info.params, agg_info.arg_column_names, measure.phase());
         agg_info.measure_column_name = description.column_name;
+        // std::cout << "description.column_name:" << description.column_name 
<< std::endl;
         description.argument_names = agg_info.arg_column_names;
         DB::AggregateFunctionProperties properties;
 
@@ -259,7 +283,7 @@ void AggregateRelParser::addMergingAggregatedStep()
 {
     AggregateDescriptions aggregate_descriptions;
     buildAggregateDescriptions(aggregate_descriptions);
-    auto settings = getContext()->getSettingsRef();
+    const auto & settings = getContext()->getSettingsRef();
     Aggregator::Params params(
         grouping_keys,
         aggregate_descriptions,
@@ -298,7 +322,7 @@ void AggregateRelParser::addCompleteModeAggregatedStep()
 {
     AggregateDescriptions aggregate_descriptions;
     buildAggregateDescriptions(aggregate_descriptions);
-    auto settings = getContext()->getSettingsRef();
+    const auto & settings = getContext()->getSettingsRef();
     bool enable_streaming_aggregating = 
getContext()->getConfigRef().getBool("enable_streaming_aggregating", true);
     if (enable_streaming_aggregating)
     {
@@ -376,7 +400,7 @@ void AggregateRelParser::addAggregatingStep()
 {
     AggregateDescriptions aggregate_descriptions;
     buildAggregateDescriptions(aggregate_descriptions);
-    auto settings = getContext()->getSettingsRef();
+    const auto & settings = getContext()->getSettingsRef();
     bool enable_streaming_aggregating = 
getContext()->getConfigRef().getBool("enable_streaming_aggregating", true);
 
     if (enable_streaming_aggregating)
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index f0715f500..1b7f0448f 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -89,6 +89,7 @@
 #include <Common/MergeTreeTool.h>
 #include <Common/logger_useful.h>
 #include <Common/typeid_cast.h>
+#include <Common/JNIUtils.h>
 
 namespace DB
 {
@@ -312,12 +313,17 @@ QueryPlanStepPtr 
SerializedPlanParser::parseReadRealWithJavaIter(const substrait
     auto iter = rel.local_files().items().at(0).uri_file();
     auto pos = iter.find(':');
     auto iter_index = std::stoi(iter.substr(pos + 1, iter.size()));
+    jobject input_iter = input_iters[iter_index];
+    bool materialize_input = materialize_inputs[iter_index];
+
+    GET_JNIENV(env)
+    SCOPE_EXIT({CLEAN_JNIENV});
+    auto * first_block = SourceFromJavaIter::peekBlock(env, input_iter);
+
+    /// Try to decide header from the first block read from Java iterator. 
Thus AggregateFunction with parameters has more precise types.
+    auto header = first_block ? first_block->cloneEmpty() : 
TypeParser::buildBlockFromNamedStruct(rel.base_schema());
+    auto source = std::make_shared<SourceFromJavaIter>(context, 
std::move(header), input_iter, materialize_input, first_block);
 
-    auto source = std::make_shared<SourceFromJavaIter>(
-        context,
-        TypeParser::buildBlockFromNamedStruct(rel.base_schema()),
-        input_iters[iter_index],
-        materialize_inputs[iter_index]);
     QueryPlanStepPtr source_step = 
std::make_unique<ReadFromPreparedSource>(Pipe(source));
     source_step->setStepDescription("Read From Java Iter");
     return source_step;
diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp 
b/cpp-ch/local-engine/Parser/TypeParser.cpp
index 12c23e606..3ad19bb2b 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.cpp
+++ b/cpp-ch/local-engine/Parser/TypeParser.cpp
@@ -280,6 +280,8 @@ DB::Block TypeParser::buildBlockFromNamedStruct(
             auto tmp_ctx = 
DB::Context::createCopy(SerializedPlanParser::global_context);
             SerializedPlanParser tmp_plan_parser(tmp_ctx);
             auto function_parser = 
AggregateFunctionParserFactory::instance().get(name_parts[3], &tmp_plan_parser);
+            /// This may remove elements from args_types, because some of them 
are used to determine CH function name, but not needed for the following
+            /// call `AggregateFunctionFactory::instance().get`
             auto agg_function_name = 
function_parser->getCHFunctionName(args_types);
             auto action = NullsAction::EMPTY;
             ch_type = AggregateFunctionFactory::instance()
@@ -316,21 +318,20 @@ DB::Block 
TypeParser::buildBlockFromNamedStructWithoutDFS(const substrait::Named
     return res;
 }
 
-bool TypeParser::isTypeMatched(const substrait::Type & substrait_type, const 
DataTypePtr & ch_type)
+bool TypeParser::isTypeMatched(const substrait::Type & substrait_type, const 
DataTypePtr & ch_type, bool ignore_nullability)
 {
     const auto parsed_ch_type = TypeParser::parseType(substrait_type);
-    // if it's only different in nullability, we consider them same.
-    // this will be problematic for some functions being not-null in spark but 
nullable in clickhouse.
-    // e.g. murmur3hash
-    const auto a = removeNullable(parsed_ch_type);
-    const auto b = removeNullable(ch_type);
-    return a->equals(*b);
-}
-
-bool TypeParser::isTypeMatchedWithNullability(const substrait::Type & 
substrait_type, const DataTypePtr & ch_type)
-{
-    const auto parsed_ch_type = TypeParser::parseType(substrait_type);
-    return parsed_ch_type->equals(*ch_type);
+    if (ignore_nullability)
+    {
+        // if it's only different in nullability, we consider them same.
+        // this will be problematic for some functions being not-null in spark 
but nullable in clickhouse.
+        // e.g. murmur3hash
+        const auto a = removeNullable(parsed_ch_type);
+        const auto b = removeNullable(ch_type);
+        return a->equals(*b);
+    }
+    else
+        return parsed_ch_type->equals(*ch_type);
 }
 
 DB::DataTypePtr TypeParser::tryWrapNullable(substrait::Type_Nullability 
nullable, DB::DataTypePtr nested_type)
diff --git a/cpp-ch/local-engine/Parser/TypeParser.h 
b/cpp-ch/local-engine/Parser/TypeParser.h
index c687c3024..666effff4 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.h
+++ b/cpp-ch/local-engine/Parser/TypeParser.h
@@ -49,8 +49,7 @@ namespace local_engine
         /// Build block from substrait NamedStruct without DFS rules, 
different from buildBlockFromNamedStruct
         static DB::Block buildBlockFromNamedStructWithoutDFS(const 
substrait::NamedStruct& struct_);
 
-        static bool isTypeMatched(const substrait::Type& substrait_type, const 
DB::DataTypePtr& ch_type);
-        static bool isTypeMatchedWithNullability(const substrait::Type& 
substrait_type, const DB::DataTypePtr& ch_type);
+        static bool isTypeMatched(const substrait::Type & substrait_type, 
const DB::DataTypePtr & ch_type, bool ignore_nullability = true);
 
     private:
         /// Mapping spark type names to CH type names.
diff --git a/cpp-ch/local-engine/Parser/WindowRelParser.cpp 
b/cpp-ch/local-engine/Parser/WindowRelParser.cpp
index a1787a2c9..4125879b5 100644
--- a/cpp-ch/local-engine/Parser/WindowRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/WindowRelParser.cpp
@@ -83,7 +83,7 @@ WindowRelParser::parse(DB::QueryPlanPtr current_plan_, const 
substrait::Rel & re
     for (auto & it : window_descriptions)
     {
         auto & win = it.second;
-        
+
         auto window_step = 
std::make_unique<DB::WindowStep>(current_plan->getCurrentDataStream(), win, 
win.window_functions, false);
         window_step->setStepDescription("Window step for window '" + 
win.window_name + "'");
         steps.emplace_back(window_step.get());
@@ -328,13 +328,14 @@ void WindowRelParser::tryAddProjectionBeforeWindow()
     for (auto & win_info : win_infos )
     {
         auto arg_nodes = 
win_info.function_parser->parseFunctionArguments(win_info.parser_func_info, 
actions_dag);
+        // This may remove elements from arg_nodes, because some of them are 
converted to CH func parameters.
+        win_info.params = 
win_info.function_parser->parseFunctionParameters(win_info.parser_func_info, 
arg_nodes);
         for (auto & arg_node : arg_nodes)
         {
             win_info.arg_column_names.emplace_back(arg_node->result_name);
             win_info.arg_column_types.emplace_back(arg_node->result_type);
             actions_dag->addOrReplaceInOutputs(*arg_node);
-        } 
-        win_info.params = 
win_info.function_parser->parseFunctionParameters(win_info.parser_func_info, 
arg_nodes);       
+        }
     }
 
     if (actions_dag->dumpDAG() != dag_footprint)
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/ApproxPercentileParser.cpp
 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/ApproxPercentileParser.cpp
new file mode 100644
index 000000000..9a164de14
--- /dev/null
+++ 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/ApproxPercentileParser.cpp
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <cmath>
+#include <string>
+#include <DataTypes/DataTypeAggregateFunction.h>
+#include <Interpreters/ActionsDAG.h>
+#include <Parser/AggregateFunctionParser.h>
+#include <Parser/aggregate_function_parser/ApproxPercentileParser.h>
+#include <Poco/StringTokenizer.h>
+#include <Common/CHUtil.h>
+#include "substrait/algebra.pb.h"
+
+namespace DB
+{
+namespace ErrorCodes
+{
+    extern const int BAD_ARGUMENTS;
+    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
+}
+}
+
+namespace local_engine
+{
+
+void ApproxPercentileParser::assertArgumentsSize(substrait::AggregationPhase 
phase, size_t size, size_t expect) const
+{
+    if (size != expect)
+        throw Exception(
+            DB::ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
+            "Function {} in phase {} requires exactly {} arguments but got {} 
arguments",
+            getName(),
+            magic_enum::enum_name(phase),
+            expect,
+            size);
+}
+
+const substrait::Expression::Literal &
+ApproxPercentileParser::assertAndGetLiteral(substrait::AggregationPhase phase, 
const substrait::Expression & expr) const
+{
+    if (!expr.has_literal())
+        throw Exception(
+            DB::ErrorCodes::BAD_ARGUMENTS,
+            "The argument of function {} in phase {} must be literal, but is 
{}",
+            getName(),
+            magic_enum::enum_name(phase),
+            expr.DebugString());
+    return expr.literal();
+}
+
+String ApproxPercentileParser::getCHFunctionName(const CommonFunctionInfo & 
func_info) const
+{
+    const auto & output_type = func_info.output_type;
+    return output_type.has_list() ? "quantilesGK" : "quantileGK";
+}
+
+String ApproxPercentileParser::getCHFunctionName(DB::DataTypes & types) const
+{
+    /// Always invoked during second stage
+    assertArgumentsSize(substrait::AGGREGATION_PHASE_INTERMEDIATE_TO_RESULT, 
types.size(), 2);
+
+    auto type = removeNullable(types[1]);
+    types.resize(1);
+    return isArray(type) ? "quantilesGK" : "quantileGK";
+}
+
+DB::Array ApproxPercentileParser::parseFunctionParameters(
+    const CommonFunctionInfo & func_info, DB::ActionsDAG::NodeRawConstPtrs & 
arg_nodes) const
+{
+    if (func_info.phase == substrait::AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE
+        || func_info.phase == substrait::AGGREGATION_PHASE_INITIAL_TO_RESULT 
|| func_info.phase == substrait::AGGREGATION_PHASE_UNSPECIFIED)
+    {
+        Array params;
+        const auto & arguments = func_info.arguments;
+        assertArgumentsSize(func_info.phase, arguments.size(), 3);
+
+        const auto & accuracy_expr = arguments[2].value();
+        const auto & accuracy_literal =  assertAndGetLiteral(func_info.phase, 
accuracy_expr);
+        auto [type1, field1] = parseLiteral(accuracy_literal);
+        params.emplace_back(std::move(field1));
+
+        const auto & percentage_expr = arguments[1].value();
+        const auto & percentage_literal = assertAndGetLiteral(func_info.phase, 
percentage_expr);
+        auto [type2, field2] = parseLiteral(percentage_literal);
+        if (isArray(type2))
+        {
+            /// Multiple percentages for quantilesGK
+            const Array & percentags = field2.get<Array>();
+            for (const auto & percentage : percentags)
+                params.emplace_back(percentage);
+        }
+        else
+        {
+            /// Single percentage for quantileGK
+            params.emplace_back(std::move(field2));
+        }
+
+        /// Delete percentage and accuracy argument for clickhouse 
compatiability
+        arg_nodes.resize(1);
+        return params;
+    }
+    else
+    {
+        assertArgumentsSize(func_info.phase, arg_nodes.size(), 1);
+        const auto & result_type = arg_nodes[0]->result_type;
+        const auto * aggregate_function_type = 
DB::checkAndGetDataType<DB::DataTypeAggregateFunction>(result_type.get());
+        if (!aggregate_function_type)
+            throw Exception(
+                DB::ErrorCodes::BAD_ARGUMENTS,
+                "The first argument type of function {} in phase {} must be 
AggregateFunction, but is {}",
+                getName(),
+                magic_enum::enum_name(func_info.phase),
+                result_type->getName());
+
+        return aggregate_function_type->getParameters();
+    }
+}
+
+DB::Array ApproxPercentileParser::getDefaultFunctionParameters() const
+{
+    return {10000, 1};
+}
+
+
+static const AggregateFunctionParserRegister<ApproxPercentileParser> 
register_approx_percentile;
+}
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/ApproxPercentileParser.h 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/ApproxPercentileParser.h
new file mode 100644
index 000000000..37eae3045
--- /dev/null
+++ 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/ApproxPercentileParser.h
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Parser/AggregateFunctionParser.h>
+
+
+/*
+spark: approx_percentile(col, percentage [, accuracy])
+1. When percentage is an array literal, spark returns an array of percentiles, 
corresponding to CH: quantilesGK(accuracy, percentage[0], ...)(col)
+1. Otherwise spark return a single percentile, corresponding to CH: 
quantileGK(accuracy, percentage)(col)
+*/
+
+namespace local_engine
+{
+class ApproxPercentileParser : public AggregateFunctionParser
+{
+public:
+    explicit ApproxPercentileParser(SerializedPlanParser * plan_parser_) : 
AggregateFunctionParser(plan_parser_) { }
+    ~ApproxPercentileParser() override = default;
+    String getName() const override { return name; }
+    static constexpr auto name = "approx_percentile";
+    String getCHFunctionName(const CommonFunctionInfo & func_info) const 
override;
+    String getCHFunctionName(DB::DataTypes & types) const override;
+
+    DB::Array
+    parseFunctionParameters(const CommonFunctionInfo & /*func_info*/, 
DB::ActionsDAG::NodeRawConstPtrs & arg_nodes) const override;
+
+    DB::Array getDefaultFunctionParameters() const override;
+
+private:
+    void assertArgumentsSize(substrait::AggregationPhase phase, size_t size, 
size_t expect) const;
+    const substrait::Expression::Literal & 
assertAndGetLiteral(substrait::AggregationPhase phase, const 
substrait::Expression & expr) const;
+};
+}
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/BloomFilterAggParser.h 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/BloomFilterAggParser.h
index 4bb41ca6d..246516442 100644
--- 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/BloomFilterAggParser.h
+++ 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/BloomFilterAggParser.h
@@ -26,10 +26,10 @@ class AggregateFunctionParserBloomFilterAgg : public 
AggregateFunctionParser
 public:
     explicit AggregateFunctionParserBloomFilterAgg(SerializedPlanParser * 
plan_parser_) : AggregateFunctionParser(plan_parser_) { }
     ~AggregateFunctionParserBloomFilterAgg() override = default;
-    String getName() const override { return "bloom_filter_agg"; }
+    String getName() const override { return name; }
     static constexpr auto name = "bloom_filter_agg";
     String getCHFunctionName(const CommonFunctionInfo &) const override { 
return "groupBloomFilterState"; }
-    String getCHFunctionName(const DB ::DataTypes &) const override { return 
"groupBloomFilterState"; }
+    String getCHFunctionName(DB::DataTypes &) const override { return 
"groupBloomFilterState"; }
 
     DB::Array
     parseFunctionParameters(const CommonFunctionInfo & /*func_info*/, 
DB::ActionsDAG::NodeRawConstPtrs & arg_nodes) const override;
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/CollectListParser.h 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/CollectListParser.h
index d7a9c1a5c..60e1b4eae 100644
--- a/cpp-ch/local-engine/Parser/aggregate_function_parser/CollectListParser.h
+++ b/cpp-ch/local-engine/Parser/aggregate_function_parser/CollectListParser.h
@@ -47,12 +47,12 @@ public:
         throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Not implement");
     }
 
-    virtual String getCHFunctionName(const DB::DataTypes &) const override
+    virtual String getCHFunctionName(DB::DataTypes &) const override
     {
         throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Not implement");
     }
     const DB::ActionsDAG::Node * convertNodeTypeIfNeeded(
-        const CommonFunctionInfo &, const DB::ActionsDAG::Node * func_node, 
DB::ActionsDAGPtr & actions_dag, bool /* withNullability */) const override
+        const CommonFunctionInfo &, const DB::ActionsDAG::Node * func_node, 
DB::ActionsDAGPtr & actions_dag, bool /* with_nullability */) const override
     {
         const DB::ActionsDAG::Node * ret_node = func_node;
         if (func_node->result_type->isNullable())
@@ -79,7 +79,7 @@ public:
     static constexpr auto name = "collect_list";
     String getName() const override { return name; }
     String getCHFunctionName(const CommonFunctionInfo &) const override { 
return "groupArray"; }
-    String getCHFunctionName(const DB::DataTypes &) const override { return 
"groupArray"; }
+    String getCHFunctionName(DB::DataTypes &) const override { return 
"groupArray"; }
 };
 
 class CollectSetParser : public CollectFunctionParser
@@ -90,6 +90,6 @@ public:
     static constexpr auto name = "collect_set";
     String getName() const override { return name; }
     String getCHFunctionName(const CommonFunctionInfo &) const override { 
return "groupUniqArray"; }
-    String getCHFunctionName(const DB::DataTypes &) const override { return 
"groupUniqArray"; }
+    String getCHFunctionName(DB::DataTypes &) const override { return 
"groupUniqArray"; }
 };
 }
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.h
 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.h
index c486e16e1..e21581e00 100644
--- 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.h
+++ 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/CommonAggregateFunctionParser.h
@@ -31,7 +31,7 @@ namespace local_engine
         String getName() const override { return  #substait_name; } \
         static constexpr auto name = #substait_name; \
         String getCHFunctionName(const CommonFunctionInfo &) const override { 
return #ch_name; } \
-        String getCHFunctionName(const DB::DataTypes &) const override { 
return #ch_name; } \
+        String getCHFunctionName(DB::DataTypes &) const override { return 
#ch_name; } \
     }; \
     static const 
AggregateFunctionParserRegister<AggregateFunctionParser##cls_name> 
register_##cls_name = 
AggregateFunctionParserRegister<AggregateFunctionParser##cls_name>();
 
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.cpp 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.cpp
index 47c9c38f7..6135546f2 100644
--- a/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.cpp
+++ b/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.cpp
@@ -34,10 +34,12 @@ String CountParser::getCHFunctionName(const 
CommonFunctionInfo &) const
 {
     return "count";
 }
-String CountParser::getCHFunctionName(const DB::DataTypes &) const
+
+String CountParser::getCHFunctionName(DB::DataTypes &) const
 {
     return "count";
 }
+
 DB::ActionsDAG::NodeRawConstPtrs CountParser::parseFunctionArguments(
     const CommonFunctionInfo & func_info, const String & /*ch_func_name*/, 
DB::ActionsDAGPtr & actions_dag) const
 {
@@ -45,7 +47,7 @@ DB::ActionsDAG::NodeRawConstPtrs 
CountParser::parseFunctionArguments(
     {
         throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Function {} 
requires at least one argument", getName());
     }
-    
+
     const DB::ActionsDAG::Node * last_arg_node = nullptr;
     if (func_info.arguments.size() == 1)
     {
@@ -82,5 +84,6 @@ DB::ActionsDAG::NodeRawConstPtrs 
CountParser::parseFunctionArguments(
     }
     return {last_arg_node};
 }
+
 static const AggregateFunctionParserRegister<CountParser> register_count;
 }
diff --git a/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.h 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.h
index 7f3411213..a561f87d9 100644
--- a/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.h
+++ b/cpp-ch/local-engine/Parser/aggregate_function_parser/CountParser.h
@@ -28,7 +28,7 @@ public:
     static constexpr auto name = "count";
     String getName() const override { return name; }
     String getCHFunctionName(const CommonFunctionInfo &) const override;
-    String getCHFunctionName(const DB::DataTypes &) const override;
+    String getCHFunctionName(DB::DataTypes &) const override;
     DB::ActionsDAG::NodeRawConstPtrs parseFunctionArguments(
         const CommonFunctionInfo & func_info, const String & ch_func_name, 
DB::ActionsDAGPtr & actions_dag) const override;
 };
diff --git 
a/cpp-ch/local-engine/Parser/aggregate_function_parser/LeadLagParser.h 
b/cpp-ch/local-engine/Parser/aggregate_function_parser/LeadLagParser.h
index 35afbfc47..4fa1c1bbc 100644
--- a/cpp-ch/local-engine/Parser/aggregate_function_parser/LeadLagParser.h
+++ b/cpp-ch/local-engine/Parser/aggregate_function_parser/LeadLagParser.h
@@ -27,7 +27,7 @@ public:
     static constexpr auto name = "lead";
     String getName() const override { return name; }
     String getCHFunctionName(const CommonFunctionInfo &) const override { 
return "leadInFrame"; }
-    String getCHFunctionName(const DB::DataTypes &) const override { return 
"leadInFrame"; }
+    String getCHFunctionName(DB::DataTypes &) const override { return 
"leadInFrame"; }
     DB::ActionsDAG::NodeRawConstPtrs parseFunctionArguments(
         const CommonFunctionInfo & func_info, const String & ch_func_name, 
DB::ActionsDAGPtr & actions_dag) const override;
 };
@@ -40,7 +40,7 @@ public:
     static constexpr auto name = "lag";
     String getName() const override { return name; }
     String getCHFunctionName(const CommonFunctionInfo &) const override { 
return "lagInFrame"; }
-    String getCHFunctionName(const DB::DataTypes &) const override { return 
"lagInFrame"; }
+    String getCHFunctionName(DB::DataTypes &) const override { return 
"lagInFrame"; }
     DB::ActionsDAG::NodeRawConstPtrs parseFunctionArguments(
         const CommonFunctionInfo & func_info, const String & ch_func_name, 
DB::ActionsDAGPtr & actions_dag) const override;
 };
diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp 
b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
index d67161623..54d1d253e 100644
--- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
+++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
@@ -49,47 +49,73 @@ static DB::Block getRealHeader(const DB::Block & header)
     return BlockUtil::buildRowCountHeader();
 }
 
-SourceFromJavaIter::SourceFromJavaIter(DB::ContextPtr context_, DB::Block 
header, jobject java_iter_, bool materialize_input_)
+
+DB::Block * SourceFromJavaIter::peekBlock(JNIEnv * env, jobject java_iter)
+{
+    jboolean has_next = safeCallBooleanMethod(env, java_iter, 
serialized_record_batch_iterator_hasNext);
+    if (has_next)
+    {
+        jbyteArray block = static_cast<jbyteArray>(safeCallObjectMethod(env, 
java_iter, serialized_record_batch_iterator_next));
+        return reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
+    }
+    return nullptr;
+}
+
+
+SourceFromJavaIter::SourceFromJavaIter(
+    DB::ContextPtr context_, DB::Block header, jobject java_iter_, bool 
materialize_input_, DB::Block * first_block_)
     : DB::ISource(getRealHeader(header))
+    , context(context_)
+    , original_header(header)
     , java_iter(java_iter_)
     , materialize_input(materialize_input_)
-    , original_header(header)
-    , context(context_)
+    , first_block(first_block_)
 {
 }
 
 DB::Chunk SourceFromJavaIter::generate()
 {
     GET_JNIENV(env)
-    jboolean has_next = safeCallBooleanMethod(env, java_iter, 
serialized_record_batch_iterator_hasNext);
+    SCOPE_EXIT({CLEAN_JNIENV});
+
     DB::Chunk result;
-    if (has_next)
+    DB::Block * data = nullptr;
+    if (first_block) [[unlikely]]
+    {
+        data = first_block;
+        first_block = nullptr;
+    }
+    else if (jboolean has_next = safeCallBooleanMethod(env, java_iter, 
serialized_record_batch_iterator_hasNext))
     {
         jbyteArray block = static_cast<jbyteArray>(safeCallObjectMethod(env, 
java_iter, serialized_record_batch_iterator_next));
-        DB::Block * data = reinterpret_cast<DB::Block *>(byteArrayToLong(env, 
block));
-        if (materialize_input)
-            materializeBlockInplace(*data);
-        if (data->rows() > 0)
+        data = reinterpret_cast<DB::Block *>(byteArrayToLong(env, block));
+    }
+    else
+        return {};
+
+    /// Post-processing
+    if (materialize_input)
+        materializeBlockInplace(*data);
+
+    if (data->rows() > 0)
+    {
+        size_t rows = data->rows();
+        if (original_header.columns())
         {
-            size_t rows = data->rows();
-            if (original_header.columns())
-            {
-                result.setColumns(data->mutateColumns(), rows);
-                convertNullable(result);
-                auto info = std::make_shared<DB::AggregatedChunkInfo>();
-                info->is_overflows = data->info.is_overflows;
-                info->bucket_num = data->info.bucket_num;
-                result.setChunkInfo(info);
-            }
-            else
-            {
-                result = BlockUtil::buildRowCountChunk(rows);
-                auto info = std::make_shared<DB::AggregatedChunkInfo>();
-                result.setChunkInfo(info);
-            }
+            result.setColumns(data->mutateColumns(), rows);
+            convertNullable(result);
+            auto info = std::make_shared<DB::AggregatedChunkInfo>();
+            info->is_overflows = data->info.is_overflows;
+            info->bucket_num = data->info.bucket_num;
+            result.setChunkInfo(info);
+        }
+        else
+        {
+            result = BlockUtil::buildRowCountChunk(rows);
+            auto info = std::make_shared<DB::AggregatedChunkInfo>();
+            result.setChunkInfo(info);
         }
     }
-    CLEAN_JNIENV
     return result;
 }
 
diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.h 
b/cpp-ch/local-engine/Storages/SourceFromJavaIter.h
index e5cc601cf..6ee02e748 100644
--- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.h
+++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.h
@@ -31,7 +31,9 @@ public:
 
     static Int64 byteArrayToLong(JNIEnv * env, jbyteArray arr);
 
-    SourceFromJavaIter(DB::ContextPtr context_, DB::Block header, jobject 
java_iter_, bool materialize_input_);
+    static DB::Block * peekBlock(JNIEnv * env, jobject java_iter);
+
+    SourceFromJavaIter(DB::ContextPtr context_, DB::Block header, jobject 
java_iter_, bool materialize_input_, DB::Block * peek_block_);
     ~SourceFromJavaIter() override;
 
     String getName() const override { return "SourceFromJavaIter"; }
@@ -41,10 +43,13 @@ private:
     void convertNullable(DB::Chunk & chunk);
     DB::ColumnPtr convertNestedNullable(const DB::ColumnPtr & column, const 
DB::DataTypePtr & target_type);
 
-    jobject java_iter;
-    bool materialize_input;
     DB::ContextPtr context;
     DB::Block original_header;
+    jobject java_iter;
+    bool materialize_input;
+
+    /// The first block read from java iteration to decide exact types of 
columns, especially for AggregateFunctions with parameters.
+    DB::Block * first_block = nullptr;
 };
 
 }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index 4f60fc54d..21640fe49 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -14,7 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+#include "config.h"
+
 #include <memory>
+#include <shared_mutex>
+#include <thread>
 #include <Disks/IO/AsynchronousBoundedReadBuffer.h>
 #include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
 #include <Disks/IO/ReadBufferFromRemoteFSGather.h>
@@ -22,9 +27,13 @@
 #include <IO/BoundedReadBuffer.h>
 #include <IO/ReadBufferFromFile.h>
 #include <IO/ReadBufferFromS3.h>
+#include <IO/ReadSettings.h>
 #include <IO/S3/getObjectInfo.h>
 #include <IO/S3Common.h>
 #include <IO/SeekableReadBuffer.h>
+#include <Interpreters/Cache/FileCache.h>
+#include <Interpreters/Cache/FileCacheFactory.h>
+#include <Interpreters/Cache/FileCacheSettings.h>
 #include <Interpreters/Context_fwd.h>
 #include <Storages/HDFS/AsynchronousReadBufferFromHDFS.h>
 #include <Storages/HDFS/HDFSCommon.h>
@@ -32,22 +41,17 @@
 #include <Storages/StorageS3Settings.h>
 #include <Storages/SubstraitSource/ReadBufferBuilder.h>
 #include <Storages/SubstraitSource/SubstraitFileSource.h>
-
-#include <sys/stat.h>
-#include <Poco/URI.h>
-#include "IO/ReadSettings.h"
-
+#include <boost/compute/detail/lru_cache.hpp>
 #include <hdfs/hdfs.h>
+#include <sys/stat.h>
 #include <Poco/Logger.h>
+#include <Poco/URI.h>
+#include <Common/CHUtil.h>
 #include <Common/FileCacheConcurrentMap.h>
 #include <Common/Throttler.h>
 #include <Common/logger_useful.h>
 #include <Common/safe_cast.h>
 
-#include <Interpreters/Cache/FileCache.h>
-#include <Interpreters/Cache/FileCacheFactory.h>
-#include <Interpreters/Cache/FileCacheSettings.h>
-
 #if USE_AWS_S3
 #include <aws/core/client/DefaultRetryStrategy.h>
 #include <aws/s3/model/CopyObjectRequest.h>
@@ -55,11 +59,6 @@
 #include <aws/s3/model/ListObjectsV2Request.h>
 #endif
 
-#include <Common/CHUtil.h>
-
-#include <shared_mutex>
-#include <thread>
-#include <boost/compute/detail/lru_cache.hpp>
 
 namespace DB
 {
diff --git 
a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
 
b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
index 42619b407..e9133b6f2 100644
--- 
a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
+++ 
b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionMappings.scala
@@ -276,7 +276,8 @@ object ExpressionMappings {
     Sig[CovSample](COVAR_SAMP),
     Sig[Last](LAST),
     Sig[First](FIRST),
-    Sig[Skewness](SKEWNESS)
+    Sig[Skewness](SKEWNESS),
+    Sig[ApproximatePercentile](APPROX_PERCENTILE)
   ) ++ SparkShimLoader.getSparkShims.aggregateExpressionMappings
 
   /** Mapping Spark window expression to Substrait function name */
diff --git 
a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
index c9aaaee28..bd2cb26ab 100644
--- 
a/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
@@ -230,6 +230,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("SPARK-22520: support code generation for large CaseWhen")
     .exclude("SPARK-24165: CaseWhen/If - nullability of nested types")
     .exclude("SPARK-27671: Fix analysis exception when casting null in nested 
field in struct")
+    .exclude("summary")
     .excludeGlutenTest("distributeBy and localSort")
     .excludeGlutenTest("describe")
     .excludeGlutenTest("Allow leading/trailing whitespace in string before 
casting")
diff --git 
a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
index d000dbecb..019451895 100644
--- 
a/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
@@ -252,6 +252,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("SPARK-22520: support code generation for large CaseWhen")
     .exclude("SPARK-24165: CaseWhen/If - nullability of nested types")
     .exclude("SPARK-27671: Fix analysis exception when casting null in nested 
field in struct")
+    .exclude("summary")
     .excludeGlutenTest("distributeBy and localSort")
     .excludeGlutenTest("describe")
     .excludeGlutenTest("Allow leading/trailing whitespace in string before 
casting")
diff --git 
a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
index 9fabee3ca..63dd2f547 100644
--- 
a/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/io/glutenproject/utils/clickhouse/ClickHouseTestSettings.scala
@@ -250,6 +250,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
     .exclude("SPARK-22520: support code generation for large CaseWhen")
     .exclude("SPARK-24165: CaseWhen/If - nullability of nested types")
     .exclude("SPARK-27671: Fix analysis exception when casting null in nested 
field in struct")
+    .exclude("summary")
     .excludeGlutenTest("distributeBy and localSort")
     .excludeGlutenTest("describe")
     .excludeGlutenTest("Allow leading/trailing whitespace in string before 
casting")
diff --git 
a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala 
b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala
index 56e10741a..f61aa3161 100644
--- 
a/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala
+++ 
b/shims/common/src/main/scala/io/glutenproject/expression/ExpressionNames.scala
@@ -46,6 +46,7 @@ object ExpressionNames {
   final val FIRST_IGNORE_NULL = "first_ignore_null"
   final val APPROX_DISTINCT = "approx_distinct"
   final val SKEWNESS = "skewness"
+  final val APPROX_PERCENTILE = "approx_percentile"
 
   // Function names used by Substrait plan.
   final val ADD = "add"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to