This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4855a4d7d [GLUTEN-4675][CH] Support write mergetree to s3 (#4676)
4855a4d7d is described below

commit 4855a4d7dcc3c35160b531cc5913d8c41ae41972
Author: Shuai li <loney...@live.cn>
AuthorDate: Fri Mar 22 18:33:16 2024 +0800

    [GLUTEN-4675][CH] Support write mergetree to s3 (#4676)
    
    What changes were proposed in this pull request?
    support write mergetree on s3 and hdfs
    
    How was this patch tested?
    UT
    ---------
    
    Co-authored-by: liuneng1994 <neng....@kyligence.io>
    Co-authored-by: liuneng <1398775...@qq.com>
---
 backends-clickhouse/pom.xml                        |  28 +-
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 526 ++++++++++++++++++++
 ...ergeTreeWriteOnObjectStorageAbstractSuite.scala | 188 +++++++
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  | 543 +++++++++++++++++++++
 cpp-ch/local-engine/CMakeLists.txt                 |   6 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              |  25 +
 cpp-ch/local-engine/Common/CHUtil.h                |   1 +
 cpp-ch/local-engine/Common/MergeTreeTool.cpp       |  27 +-
 cpp-ch/local-engine/Common/MergeTreeTool.h         |   9 +-
 .../Disks/ObjectStorages/GlutenDiskHDFS.cpp        |  75 +++
 .../Disks/ObjectStorages/GlutenDiskHDFS.h          |  63 +++
 .../ObjectStorages/GlutenHDFSObjectStorage.cpp     |  42 ++
 .../Disks/ObjectStorages/GlutenHDFSObjectStorage.h |  53 ++
 .../registerGlutenDiskObjectStorage.cpp            | 120 +++++
 cpp-ch/local-engine/Disks/registerGlutenDisks.cpp  |  99 ++++
 cpp-ch/local-engine/Disks/registerGlutenDisks.h    |  27 +
 cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp  |  13 +-
 cpp-ch/local-engine/Parser/MergeTreeRelParser.h    |   1 -
 .../Storages/CustomStorageMergeTree.cpp            |   3 +-
 .../Storages/Mergetree/MetaDataHelper.cpp          |  95 ++++
 .../Storages/Mergetree/MetaDataHelper.h            |  29 ++
 .../Storages/Mergetree/SparkMergeTreeWriter.cpp    |  55 ++-
 .../Storages/Mergetree/SparkMergeTreeWriter.h      |   6 +-
 cpp-ch/local-engine/local_engine_jni.cpp           |   2 +-
 .../substrait/rel/ExtensionTableNode.java          |   8 +-
 25 files changed, 2008 insertions(+), 36 deletions(-)

diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index e0f96eda1..f173a61c9 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -19,6 +19,12 @@
       <artifactId>gluten-core</artifactId>
       <version>${project.version}</version>
       <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>guava</artifactId>
+          <groupId>com.google.guava</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>io.glutenproject</groupId>
@@ -50,6 +56,16 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-yarn_${scala.binary.version}</artifactId>
       <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>hadoop-client-api</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-client-runtime</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
@@ -138,7 +154,11 @@
             <groupId>org.apache.hive</groupId>
             <artifactId>hive-exec</artifactId>
           </exclusion>
-           </exclusions>
+          <exclusion>
+            <artifactId>guava</artifactId>
+            <groupId>com.google.guava</groupId>
+          </exclusion>
+        </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -181,6 +201,12 @@
       <version>1.11.901</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>io.minio</groupId>
+      <artifactId>minio</artifactId>
+      <version>8.5.9</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
new file mode 100644
index 000000000..88fc977b7
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+
+import java.io.File
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteOnHDFSSuite
+  extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected val needCopyParquetToTablePath = true
+
+  override protected val tablesPath: String = basePath + "/tpch-data"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    val conf = new Configuration
+    conf.set("fs.defaultFS", HDFS_URL)
+    val fs = FileSystem.get(conf)
+    fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+//    FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+//    FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+  }
+
+  override protected def afterEach(): Unit = {
+    super.afterEach()
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+//    FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+  }
+
+  test("test mergetree table write") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_hdfs
+                 | select * from lineitem
+                 |""".stripMargin)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_hdfs
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr) {
+      df =>
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assert(addFiles.size == 1)
+        assert(addFiles.head.rows == 600572)
+    }
+    spark.sql("drop table lineitem_mergetree_hdfs")
+  }
+
+  test("test mergetree write with orderby keys / primary keys") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey_hdfs
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |TBLPROPERTIES (storage_policy='__hdfs_main',
+                 |               orderByKey='l_shipdate,l_orderkey',
+                 |               primaryKey='l_shipdate')
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_orderbykey_hdfs
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_orderbykey_hdfs
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr) {
+      df =>
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .orderByKeyOption
+            .get
+            .mkString(",")
+            .equals("l_shipdate,l_orderkey"))
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .primaryKeyOption
+            .get
+            .mkString(",")
+            .equals("l_shipdate"))
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assert(addFiles.size == 1)
+        assert(addFiles.head.rows == 600572)
+    }
+    spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
+  }
+
+  test("test mergetree write with partition") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition_hdfs
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_returnflag)
+                 |TBLPROPERTIES (storage_policy='__hdfs_main',
+                 |               orderByKey='l_orderkey',
+                 |               primaryKey='l_orderkey')
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs'
+                 |""".stripMargin)
+
+    // dynamic partitions
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_partition_hdfs
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    // write with dataframe api
+    val source = spark.sql(s"""
+                              |select
+                              | l_orderkey      ,
+                              | l_partkey       ,
+                              | l_suppkey       ,
+                              | l_linenumber    ,
+                              | l_quantity      ,
+                              | l_extendedprice ,
+                              | l_discount      ,
+                              | l_tax           ,
+                              | l_returnflag    ,
+                              | l_linestatus    ,
+                              | l_shipdate      ,
+                              | l_commitdate    ,
+                              | l_receiptdate   ,
+                              | l_shipinstruct  ,
+                              | l_shipmode      ,
+                              | l_comment
+                              | from lineitem
+                              | where l_shipdate BETWEEN date'1993-01-01' AND 
date'1993-01-10'
+                              |""".stripMargin)
+
+    source.write
+      .format("clickhouse")
+      .mode(SaveMode.Append)
+      .insertInto("lineitem_mergetree_partition_hdfs")
+
+    // static partition
+    spark.sql(s"""
+                 | insert into lineitem_mergetree_partition_hdfs PARTITION 
(l_returnflag = 'A')
+                 | (l_shipdate,
+                 |  l_orderkey,
+                 |  l_partkey,
+                 |  l_suppkey,
+                 |  l_linenumber,
+                 |  l_quantity,
+                 |  l_extendedprice,
+                 |  l_discount,
+                 |  l_tax,
+                 |  l_linestatus,
+                 |  l_commitdate,
+                 |  l_receiptdate,
+                 |  l_shipinstruct,
+                 |  l_shipmode,
+                 |  l_comment)
+                 | select
+                 |  l_shipdate,
+                 |  l_orderkey,
+                 |  l_partkey,
+                 |  l_suppkey,
+                 |  l_linenumber,
+                 |  l_quantity,
+                 |  l_extendedprice,
+                 |  l_discount,
+                 |  l_tax,
+                 |  l_linestatus,
+                 |  l_commitdate,
+                 |  l_receiptdate,
+                 |  l_shipinstruct,
+                 |  l_shipmode,
+                 |  l_comment from lineitem
+                 |  where l_returnflag = 'A'
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_partition_hdfs
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
+      df =>
+        val result = df.collect()
+        assert(result.length == 4)
+        assert(result(0).getString(0).equals("A"))
+        assert(result(0).getString(1).equals("F"))
+        assert(result(0).getDouble(2) == 7578058.0)
+
+        assert(result(2).getString(0).equals("N"))
+        assert(result(2).getString(1).equals("O"))
+        assert(result(2).getDouble(2) == 7454519.0)
+
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+        assert(mergetreeScan.metrics("numFiles").value == 6)
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .orderByKeyOption
+            .get
+            .mkString(",")
+            .equals("l_orderkey"))
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .primaryKeyOption
+            .get
+            .mkString(",")
+            .equals("l_orderkey"))
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .partitionColumns(0)
+            .equals("l_returnflag"))
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+
+        assert(addFiles.size == 6)
+        assert(addFiles.map(_.rows).sum == 750735)
+    }
+    spark.sql("drop table lineitem_mergetree_partition_hdfs")
+  }
+
+  test("test mergetree write with bucket table") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_hdfs
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_returnflag)
+                 |CLUSTERED BY (l_orderkey)
+                 |${if (sparkVersion.equals("3.2")) "" else "SORTED BY 
(l_orderkey)"} INTO 4 BUCKETS
+                 |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_bucket_hdfs
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_bucket_hdfs
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr) {
+      df =>
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec(0)
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        if (sparkVersion.equals("3.2")) {
+          
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+        } else {
+          assert(
+            ClickHouseTableV2
+              .getTable(fileIndex.deltaLog)
+              .orderByKeyOption
+              .get
+              .mkString(",")
+              .equals("l_orderkey"))
+        }
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .partitionColumns(0)
+            .equals("l_returnflag"))
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+
+        assert(addFiles.size == 12)
+        assert(addFiles.map(_.rows).sum == 600572)
+    }
+    spark.sql("drop table lineitem_mergetree_bucket_hdfs")
+  }
+
+}
+// scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
new file mode 100644
index 000000000..e9b938dbf
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import io.glutenproject.GlutenConfig
+
+import org.apache.spark.sql.SparkSession
+
+import _root_.org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+  extends GlutenClickHouseTPCHAbstractSuite
+  with AdaptiveSparkPlanHelper {
+  private var _spark: SparkSession = _
+
+  override protected def spark: SparkSession = _spark
+
+  override protected val needCopyParquetToTablePath = true
+
+  override protected val tablesPath: String = basePath + "/tpch-data"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+
+  protected val sparkVersion: String = {
+    val version = SPARK_VERSION_SHORT.split("\\.")
+    version(0) + "." + version(1)
+  }
+
+  val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
+  val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
+  val S3_ENDPOINT = "s3://127.0.0.1:9000/"
+  val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
+  val BUCKET_NAME: String = sparkVersion.replace(".", "-")
+  val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
+
+  val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
+  val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
+  val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020"
+  val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
+
+  val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
+  val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
+
+  override protected def initializeSession(): Unit = {
+    if (_spark == null) {
+      _spark = SparkSession
+        .builder()
+        .appName("Gluten-UT-RemoteHS")
+        .config(sparkConf)
+        .getOrCreate()
+    }
+  }
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .setMaster("local[2]")
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.io.compression.codec", "LZ4")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+      
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", 
"false")
+      .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"error")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path",
+        "/tmp/user_defined")
+      .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
+      .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
+      .set("spark.hadoop.fs.s3a.impl", 
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+      .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
+      .set("spark.hadoop.fs.s3a.path.style.access", "true")
+      .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type",
+        "s3_gluten")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.endpoint",
+        WHOLE_PATH)
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.access_key_id",
+        S3_ACCESS_KEY)
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.secret_access_key",
+        S3_SECRET_KEY)
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.metadata_path",
+        S3_METADATA_PATH)
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.type",
+        "cache")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.disk",
+        "s3")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.path",
+        S3_CACHE_PATH)
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.max_size",
+        "10Gi")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes",
+        "main")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk",
+        "s3_cache")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type",
+        "hdfs_gluten")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint",
+        HDFS_URL_ENDPOINT + "/")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path",
+        HDFS_METADATA_PATH)
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type",
+        "cache")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk",
+        "hdfs")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path",
+        HDFS_CACHE_PATH)
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size",
+        "10Gi")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes",
+        "main")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk",
+        "hdfs_cache")
+      .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", 
"sparkMurmurHash3_32")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit",
+        "false")
+      
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_default_replica",
 "1")
+  }
+  override protected def createTPCHNotNullTables(): Unit = {
+    createNotNullTPCHTablesInParquet(tablesPath)
+  }
+
+  override protected def afterAll(): Unit = {
+    try {
+      super.afterAll()
+    } finally {
+      try {
+        if (_spark != null) {
+          try {
+            _spark.sessionState.catalog.reset()
+          } finally {
+            _spark.stop()
+            _spark = null
+          }
+        }
+      } finally {
+        SparkSession.clearActiveSession()
+        SparkSession.clearDefaultSession()
+      }
+    }
+
+    FileUtils.forceDelete(new File(basePath))
+    // init GlutenConfig in the next beforeAll
+    GlutenConfig.ins = null
+  }
+}
+// scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
new file mode 100644
index 000000000..45eb8625a
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+
+import _root_.org.apache.commons.io.FileUtils
+import _root_.org.apache.spark.sql.SaveMode
+import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import io.minio.{BucketExistsArgs, ListObjectsArgs, MakeBucketArgs, 
MinioClient, RemoveBucketArgs, RemoveObjectsArgs}
+import io.minio.messages.DeleteObject
+
+import java.io.File
+import java.util
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteOnS3Suite
+  extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected val needCopyParquetToTablePath = true
+
+  override protected val tablesPath: String = basePath + "/tpch-data"
+  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
+  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    val client = MinioClient
+      .builder()
+      .endpoint(MINIO_ENDPOINT)
+      .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
+      .build()
+    if 
(client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
+      val results =
+        
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
+      val objects = new util.LinkedList[DeleteObject]()
+      results.forEach(
+        obj => {
+          objects.add(new DeleteObject(obj.get().objectName()))
+        })
+      val removeResults = client.removeObjects(
+        
RemoveObjectsArgs.builder().bucket(BUCKET_NAME).objects(objects).build())
+      removeResults.forEach(result => result.get().message())
+      
client.removeBucket(RemoveBucketArgs.builder().bucket(BUCKET_NAME).build())
+    }
+    client.makeBucket(MakeBucketArgs.builder().bucket(BUCKET_NAME).build())
+    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+    FileUtils.deleteDirectory(new File(S3_CACHE_PATH))
+    FileUtils.forceMkdir(new File(S3_METADATA_PATH))
+    FileUtils.forceMkdir(new File(S3_CACHE_PATH))
+  }
+
+  override protected def afterEach(): Unit = {
+    super.afterEach()
+    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+    FileUtils.deleteDirectory(new File(S3_CACHE_PATH))
+  }
+
+  test("test mergetree table write") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_s3;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_s3
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_s3'
+                 |TBLPROPERTIES (storage_policy='__s3_main')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_s3
+                 | select * from lineitem
+                 |""".stripMargin)
+    FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_s3
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr) {
+      df =>
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assert(addFiles.size == 1)
+        assert(addFiles.head.rows == 600572)
+    }
+    spark.sql("drop table lineitem_mergetree_s3") // clean up
+  }
+
+  test("test mergetree write with orderby keys / primary keys") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey_s3
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |TBLPROPERTIES (storage_policy='__s3_main',
+                 |               orderByKey='l_shipdate,l_orderkey',
+                 |               primaryKey='l_shipdate')
+                 |LOCATION 
's3a://$BUCKET_NAME/lineitem_mergetree_orderbykey_s3'
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_orderbykey_s3
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_orderbykey_s3
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr) {
+      df =>
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .orderByKeyOption
+            .get
+            .mkString(",")
+            .equals("l_shipdate,l_orderkey"))
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .primaryKeyOption
+            .get
+            .mkString(",")
+            .equals("l_shipdate"))
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+        assert(addFiles.size == 1)
+        assert(addFiles.head.rows == 600572)
+    }
+    spark.sql("drop table lineitem_mergetree_orderbykey_s3")
+  }
+
+  test("test mergetree write with partition") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_partition_s3;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition_s3
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_returnflag)
+                 |TBLPROPERTIES (storage_policy='__s3_main',
+                 |               orderByKey='l_orderkey',
+                 |               primaryKey='l_orderkey')
+                 |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_partition_s3'
+                 |""".stripMargin)
+
+    // dynamic partitions
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_partition_s3
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    // write with dataframe api
+    val source = spark.sql(s"""
+                              |select
+                              | l_orderkey      ,
+                              | l_partkey       ,
+                              | l_suppkey       ,
+                              | l_linenumber    ,
+                              | l_quantity      ,
+                              | l_extendedprice ,
+                              | l_discount      ,
+                              | l_tax           ,
+                              | l_returnflag    ,
+                              | l_linestatus    ,
+                              | l_shipdate      ,
+                              | l_commitdate    ,
+                              | l_receiptdate   ,
+                              | l_shipinstruct  ,
+                              | l_shipmode      ,
+                              | l_comment
+                              | from lineitem
+                              | where l_shipdate BETWEEN date'1993-01-01' AND 
date'1993-01-10'
+                              |""".stripMargin)
+
+    source.write
+      .format("clickhouse")
+      .mode(SaveMode.Append)
+      .insertInto("lineitem_mergetree_partition_s3")
+
+    // static partition
+    spark.sql(s"""
+                 | insert into lineitem_mergetree_partition_s3 PARTITION 
(l_returnflag = 'A')
+                 | (l_shipdate,
+                 |  l_orderkey,
+                 |  l_partkey,
+                 |  l_suppkey,
+                 |  l_linenumber,
+                 |  l_quantity,
+                 |  l_extendedprice,
+                 |  l_discount,
+                 |  l_tax,
+                 |  l_linestatus,
+                 |  l_commitdate,
+                 |  l_receiptdate,
+                 |  l_shipinstruct,
+                 |  l_shipmode,
+                 |  l_comment)
+                 | select
+                 |  l_shipdate,
+                 |  l_orderkey,
+                 |  l_partkey,
+                 |  l_suppkey,
+                 |  l_linenumber,
+                 |  l_quantity,
+                 |  l_extendedprice,
+                 |  l_discount,
+                 |  l_tax,
+                 |  l_linestatus,
+                 |  l_commitdate,
+                 |  l_receiptdate,
+                 |  l_shipinstruct,
+                 |  l_shipmode,
+                 |  l_comment from lineitem
+                 |  where l_returnflag = 'A'
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_partition_s3
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
+      df =>
+        val result = df.collect()
+        assert(result.length == 4)
+        assert(result(0).getString(0).equals("A"))
+        assert(result(0).getString(1).equals("F"))
+        assert(result(0).getDouble(2) == 7578058.0)
+
+        assert(result(2).getString(0).equals("N"))
+        assert(result(2).getString(1).equals("O"))
+        assert(result(2).getDouble(2) == 7454519.0)
+
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec.head
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+        assert(mergetreeScan.metrics("numFiles").value == 6)
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .orderByKeyOption
+            .get
+            .mkString(",")
+            .equals("l_orderkey"))
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .primaryKeyOption
+            .get
+            .mkString(",")
+            .equals("l_orderkey"))
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .partitionColumns(0)
+            .equals("l_returnflag"))
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+
+        assert(addFiles.size == 6)
+        assert(addFiles.map(_.rows).sum == 750735)
+    }
+    spark.sql("drop table lineitem_mergetree_partition_s3")
+
+  }
+
+  test("test mergetree write with bucket table") {
+    spark.sql(s"""
+                 |DROP TABLE IF EXISTS lineitem_mergetree_bucket_s3;
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_s3
+                 |(
+                 | l_orderkey      bigint,
+                 | l_partkey       bigint,
+                 | l_suppkey       bigint,
+                 | l_linenumber    bigint,
+                 | l_quantity      double,
+                 | l_extendedprice double,
+                 | l_discount      double,
+                 | l_tax           double,
+                 | l_returnflag    string,
+                 | l_linestatus    string,
+                 | l_shipdate      date,
+                 | l_commitdate    date,
+                 | l_receiptdate   date,
+                 | l_shipinstruct  string,
+                 | l_shipmode      string,
+                 | l_comment       string
+                 |)
+                 |USING clickhouse
+                 |PARTITIONED BY (l_returnflag)
+                 |CLUSTERED BY (l_orderkey)
+                 |${if (sparkVersion.equals("3.2")) "" else "SORTED BY 
(l_orderkey)"} INTO 4 BUCKETS
+                 |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3'
+                 |TBLPROPERTIES (storage_policy='__s3_main')
+                 |""".stripMargin)
+
+    spark.sql(s"""
+                 | insert into table lineitem_mergetree_bucket_s3
+                 | select * from lineitem
+                 |""".stripMargin)
+
+    val sqlStr =
+      s"""
+         |SELECT
+         |    l_returnflag,
+         |    l_linestatus,
+         |    sum(l_quantity) AS sum_qty,
+         |    sum(l_extendedprice) AS sum_base_price,
+         |    sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+         |    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS 
sum_charge,
+         |    avg(l_quantity) AS avg_qty,
+         |    avg(l_extendedprice) AS avg_price,
+         |    avg(l_discount) AS avg_disc,
+         |    count(*) AS count_order
+         |FROM
+         |    lineitem_mergetree_bucket_s3
+         |WHERE
+         |    l_shipdate <= date'1998-09-02' - interval 1 day
+         |GROUP BY
+         |    l_returnflag,
+         |    l_linestatus
+         |ORDER BY
+         |    l_returnflag,
+         |    l_linestatus;
+         |
+         |""".stripMargin
+    runTPCHQueryBySQL(1, sqlStr) {
+      df =>
+        val scanExec = collect(df.queryExecution.executedPlan) {
+          case f: FileSourceScanExecTransformer => f
+        }
+        assert(scanExec.size == 1)
+
+        val mergetreeScan = scanExec(0)
+        assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+        val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+        
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+        if (sparkVersion.equals("3.2")) {
+          
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+        } else {
+          assert(
+            ClickHouseTableV2
+              .getTable(fileIndex.deltaLog)
+              .orderByKeyOption
+              .get
+              .mkString(",")
+              .equals("l_orderkey"))
+        }
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+        
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size == 
1)
+        assert(
+          ClickHouseTableV2
+            .getTable(fileIndex.deltaLog)
+            .partitionColumns(0)
+            .equals("l_returnflag"))
+        val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
+
+        assert(addFiles.size == 12)
+        assert(addFiles.map(_.rows).sum == 600572)
+    }
+    spark.sql("drop table lineitem_mergetree_bucket_s3")
+  }
+
+}
+// scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/CMakeLists.txt 
b/cpp-ch/local-engine/CMakeLists.txt
index ae58aa70c..40fe4402f 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -57,6 +57,8 @@ add_headers_and_sources(shuffle Shuffle)
 add_headers_and_sources(operator Operator)
 add_headers_and_sources(jni jni)
 add_headers_and_sources(aggregate_functions AggregateFunctions)
+add_headers_and_sources(disks Disks)
+add_headers_and_sources(disks Disks/ObjectStorages)
 
 include_directories(
         ${JNI_INCLUDE_DIRS}
@@ -89,7 +91,9 @@ add_library(gluten_clickhouse_backend_libs
         ${shuffle_sources}
         ${operator_sources}
         ${aggregate_functions_sources}
-        ${jni_sources})
+        ${jni_sources}
+        ${disks_sources}
+)
 
 target_link_libraries(gluten_clickhouse_backend_libs PUBLIC
         substrait_source
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index b728d7df0..880f6668d 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -67,6 +67,7 @@
 #include <boost/algorithm/string/predicate.hpp>
 
 #include "CHUtil.h"
+#include "Disks/registerGlutenDisks.h"
 
 #include <unistd.h>
 #include <sys/resource.h>
@@ -677,6 +678,19 @@ void 
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
 
         global_context->setTemporaryStoragePath(config->getString("tmp_path", 
getDefaultPath()), 0);
         global_context->setPath(config->getString("path", "/"));
+
+        String mark_cache_policy = config->getString("mark_cache_policy", 
DEFAULT_MARK_CACHE_POLICY);
+        size_t mark_cache_size = config->getUInt64("mark_cache_size", 
DEFAULT_MARK_CACHE_MAX_SIZE);
+        double mark_cache_size_ratio = 
config->getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO);
+        if (!mark_cache_size)
+            LOG_ERROR(&Poco::Logger::get("CHUtil"), "Too low mark cache size 
will lead to severe performance degradation.");
+
+        global_context->setMarkCache(mark_cache_policy, mark_cache_size, 
mark_cache_size_ratio);
+
+        String index_mark_cache_policy = 
config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
+        size_t index_mark_cache_size = 
config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
+        double index_mark_cache_size_ratio = 
config->getDouble("index_mark_cache_size_ratio", 
DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
+        global_context->setIndexMarkCache(index_mark_cache_policy, 
index_mark_cache_size, index_mark_cache_size_ratio);
     }
 }
 
@@ -709,11 +723,22 @@ void registerAllFunctions()
         auto & factory = AggregateFunctionCombinatorFactory::instance();
         registerAggregateFunctionCombinatorPartialMerge(factory);
     }
+
+}
+
+void registerGlutenDisks()
+{
     registerDisks(true);
+
+#if USE_AWS_S3
+    registerGlutenDisks(true);
+#endif
 }
 
 void BackendInitializerUtil::registerAllFactories()
 {
+    registerGlutenDisks();
+
     registerReadBufferBuilders();
     registerWriteBufferBuilders();
 
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index f6030485b..308e22422 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -124,6 +124,7 @@ public:
 };
 
 void registerAllFunctions();
+void registerGlutenDisks();
 
 class BackendFinalizerUtil;
 class JNIUtils;
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp 
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index 2f6b4602d..c5122905e 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -22,6 +22,8 @@
 #include <IO/WriteHelpers.h>
 #include <Storages/MergeTree/IMergeTreeDataPart.h>
 #include <google/protobuf/util/json_util.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/document.h>
 
 using namespace DB;
 
@@ -40,7 +42,10 @@ std::shared_ptr<DB::StorageInMemoryMetadata> 
buildMetaData(const DB::NamesAndTyp
     metadata->sorting_key = KeyDescription::parse(table.order_by_key, 
metadata->getColumns(), context);
     if (table.primary_key.empty())
     {
-        metadata->primary_key.expression = 
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>());
+         if (table.order_by_key != MergeTreeTable::TUPLE)
+             metadata->primary_key = KeyDescription::parse(table.order_by_key, 
metadata->getColumns(), context);
+         else
+            metadata->primary_key.expression = 
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>());
     }
     else
     {
@@ -49,13 +54,12 @@ std::shared_ptr<DB::StorageInMemoryMetadata> 
buildMetaData(const DB::NamesAndTyp
     return metadata;
 }
 
-std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings()
+std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const 
MergeTreeTableSettings & config)
 {
     auto settings = std::make_unique<DB::MergeTreeSettings>();
-//    settings->set("min_bytes_for_wide_part", Field(0));
-//    settings->set("min_rows_for_wide_part", Field(0));
     settings->set("allow_nullable_key", Field(1));
-    // settings->set("storage_policy", Field("s3_main"));
+    if (!config.storage_policy.empty())
+        settings->set("storage_policy", Field(config.storage_policy));
     return settings;
 }
 
@@ -70,6 +74,15 @@ std::unique_ptr<SelectQueryInfo> 
buildQueryInfo(NamesAndTypesList & names_and_ty
 }
 
 
+void parseTableConfig(MergeTreeTableSettings & settings, String config_json)
+{
+    rapidjson::Document doc;
+    doc.Parse(config_json.c_str());
+    if (doc.HasMember("storage_policy"))
+        settings.storage_policy = doc["storage_policy"].GetString();
+
+}
+
 MergeTreeTable parseMergeTreeTableString(const std::string & info)
 {
 
@@ -97,7 +110,9 @@ MergeTreeTable parseMergeTreeTableString(const std::string & 
info)
     assertChar('\n', in);
     readString(table.absolute_path, in);
     assertChar('\n', in);
-    readString(table.table_configs_json, in);
+    String json;
+    readString(json, in);
+    parseTableConfig(table.table_configs, json);
     assertChar('\n', in);
     while (!in.eof())
     {
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h 
b/cpp-ch/local-engine/Common/MergeTreeTool.h
index a6af7ebca..bde632f0d 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.h
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.h
@@ -43,6 +43,11 @@ struct MergeTreePart
     size_t end;
 };
 
+struct MergeTreeTableSettings
+{
+    String storage_policy = "";
+};
+
 struct MergeTreeTable
 {
     inline static const String TUPLE = "tuple()";
@@ -54,7 +59,7 @@ struct MergeTreeTable
     std::string primary_key = "";
     std::string relative_path;
     std::string absolute_path;
-    std::string table_configs_json;
+    MergeTreeTableSettings table_configs;
     std::vector<MergeTreePart> parts;
     std::unordered_set<String> getPartNames() const;
     RangesInDataParts extractRange(DataPartsVector parts_vector) const;
@@ -62,7 +67,7 @@ struct MergeTreeTable
 
 std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const 
DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &);
 
-std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings();
+std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const 
MergeTreeTableSettings & config);
 
 std::unique_ptr<SelectQueryInfo> buildQueryInfo(NamesAndTypesList & 
names_and_types_list);
 
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
new file mode 100644
index 000000000..bff4108f2
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenDiskHDFS.h"
+#include <ranges>
+#include <Parser/SerializedPlanParser.h>
+#if USE_HDFS
+
+namespace local_engine
+{
+using namespace DB;
+
+void GlutenDiskHDFS::createDirectory(const String & path)
+{
+    DiskObjectStorage::createDirectory(path);
+    hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str());
+}
+
+String GlutenDiskHDFS::path2AbsPath(const String & path)
+{
+    return getObjectStorage()->generateObjectKeyForPath(path).serialize();
+}
+
+void GlutenDiskHDFS::createDirectories(const String & path)
+{
+    DiskObjectStorage::createDirectories(path);
+    auto* hdfs = hdfs_object_storage->getHDFSFS();
+    fs::path p = path;
+    std::vector<std::string> paths_created;
+    while (hdfsExists(hdfs, p.c_str()) < 0)
+    {
+        paths_created.push_back(p);
+        if (!p.has_parent_path())
+            break;
+        p = p.parent_path();
+    }
+    for (const auto & path_to_create : paths_created | std::views::reverse)
+        hdfsCreateDirectory(hdfs, path_to_create.c_str());
+}
+
+void GlutenDiskHDFS::removeDirectory(const String & path)
+{
+    DiskObjectStorage::removeDirectory(path);
+    hdfsDelete(hdfs_object_storage->getHDFSFS(), path.c_str(), 1);
+}
+
+DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
+{
+    const auto config_prefix = "storage_configuration.disks." + name;
+    return std::make_shared<GlutenDiskHDFS>(
+        getName(),
+        object_key_prefix,
+        getMetadataStorage(),
+        getObjectStorage(),
+        SerializedPlanParser::global_context->getConfigRef(),
+        config_prefix);
+}
+
+
+}
+#endif
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
new file mode 100644
index 000000000..9caedaae8
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <config.h>
+
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#if USE_HDFS
+#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
+#endif
+
+namespace local_engine
+{
+#if USE_HDFS
+class GlutenDiskHDFS : public DB::DiskObjectStorage
+{
+public:
+    GlutenDiskHDFS(
+        const String & name_,
+        const String & object_key_prefix_,
+        DB::MetadataStoragePtr metadata_storage_,
+        DB::ObjectStoragePtr object_storage_,
+        const Poco::Util::AbstractConfiguration & config,
+        const String & config_prefix)
+        : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, 
object_storage_, config, config_prefix)
+    {
+        chassert(dynamic_cast<local_engine::GlutenHDFSObjectStorage 
*>(object_storage_.get()) != nullptr);
+        object_key_prefix = object_key_prefix_;
+        hdfs_object_storage = 
dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
+        hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
+    }
+
+    void createDirectory(const String & path) override;
+
+    void createDirectories(const String & path) override;
+
+    void removeDirectory(const String & path) override;
+
+    DB::DiskObjectStoragePtr createDiskObjectStorage() override;
+private:
+    String path2AbsPath(const String & path);
+
+    GlutenHDFSObjectStorage * hdfs_object_storage;
+    String object_key_prefix;
+};
+#endif
+}
+
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
new file mode 100644
index 000000000..3a844a91f
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenHDFSObjectStorage.h"
+#if USE_HDFS
+#include <Storages/HDFS/ReadBufferFromHDFS.h>
+using namespace DB;
+namespace local_engine
+{
+std::unique_ptr<ReadBufferFromFileBase> GlutenHDFSObjectStorage::readObject( 
/// NOLINT
+    const StoredObject & object,
+    const ReadSettings & read_settings,
+    std::optional<size_t>,
+    std::optional<size_t>) const
+{
+    size_t begin_of_path = object.remote_path.find('/', 
object.remote_path.find("//") + 2);
+    auto hdfs_path = object.remote_path.substr(begin_of_path);
+    auto hdfs_uri = object.remote_path.substr(0, begin_of_path);
+    return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_path, config, 
HDFSObjectStorage::patchSettings(read_settings));
+}
+
+DB::ObjectStorageKey 
local_engine::GlutenHDFSObjectStorage::generateObjectKeyForPath(const 
std::string & path) const
+{
+    return DB::ObjectStorageKey::createAsAbsolute(hdfs_root_path + path);
+}
+}
+#endif
+
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
new file mode 100644
index 000000000..1efa441c2
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "config.h"
+
+#if USE_HDFS
+#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
+#endif
+
+namespace local_engine
+{
+
+#if USE_HDFS
+class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage
+{
+public:
+    GlutenHDFSObjectStorage(
+            const String & hdfs_root_path_,
+            SettingsPtr settings_,
+            const Poco::Util::AbstractConfiguration & config_)
+        : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_), 
config(config_)
+    {
+    }
+    std::unique_ptr<DB::ReadBufferFromFileBase> readObject( /// NOLINT
+      const DB::StoredObject & object,
+      const DB::ReadSettings & read_settings = DB::ReadSettings{},
+      std::optional<size_t> read_hint = {},
+      std::optional<size_t> file_size = {}) const override;
+    DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path) 
const override;
+    hdfsFS getHDFSFS() const { return hdfs_fs.get(); }
+private:
+    const Poco::Util::AbstractConfiguration & config;
+};
+#endif
+
+}
+
+
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
new file mode 100644
index 000000000..8f2008029
--- /dev/null
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <Disks/ObjectStorages/ObjectStorageFactory.h>
+#if USE_AWS_S3
+#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
+#include <Disks/ObjectStorages/S3/diskSettings.h>
+#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
+#endif
+
+#if USE_HDFS
+#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
+#endif
+
+#include <Interpreters/Context.h>
+#include <Common/Macros.h>
+
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int BAD_ARGUMENTS;
+extern const int LOGICAL_ERROR;
+}
+}
+
+namespace local_engine
+{
+using namespace DB;
+
+#if USE_AWS_S3
+static S3::URI getS3URI(
+    const Poco::Util::AbstractConfiguration & config,
+    const std::string & config_prefix,
+    const ContextPtr & context)
+{
+    String endpoint = 
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
+    S3::URI uri(endpoint);
+
+    /// An empty key remains empty.
+    if (!uri.key.empty() && !uri.key.ends_with('/'))
+        uri.key.push_back('/');
+
+    return uri;
+}
+
+void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory)
+{
+    static constexpr auto disk_type = "s3_gluten";
+
+    factory.registerObjectStorageType(
+        disk_type,
+        [](
+        const std::string & name,
+        const Poco::Util::AbstractConfiguration & config,
+        const std::string & config_prefix,
+        const ContextPtr & context,
+        bool /*skip_access_check*/) -> ObjectStoragePtr
+        {
+            auto uri = getS3URI(config, config_prefix, context);
+            auto s3_capabilities = getCapabilitiesFromConfig(config, 
config_prefix);
+            auto settings = getSettings(config, config_prefix, context);
+            auto client = getClient(config, config_prefix, context, *settings);
+            auto key_generator = 
createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
+
+            auto object_storage = std::make_shared<S3ObjectStorage>(
+                std::move(client),
+                std::move(settings),
+                uri,
+                s3_capabilities,
+                key_generator,
+                name);
+            return object_storage;
+        });
+}
+
+#endif
+
+#if USE_HDFS
+void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
+{
+    factory.registerObjectStorageType(
+        "hdfs_gluten",
+        [](
+            const std::string & /* name */,
+            const Poco::Util::AbstractConfiguration & config,
+            const std::string & config_prefix,
+            const ContextPtr & context,
+            bool /* skip_access_check */) -> ObjectStoragePtr
+        {
+            auto uri = 
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
+            checkHDFSURL(uri);
+            if (uri.back() != '/')
+                throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must 
ends with '/', but '{}' doesn't.", uri);
+
+            std::unique_ptr<HDFSObjectStorageSettings> settings = 
std::make_unique<HDFSObjectStorageSettings>(
+                config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 
1024),
+                config.getInt(config_prefix + ".objects_chunk_size_to_delete", 
1000),
+                context->getSettingsRef().hdfs_replication
+            );
+            return std::make_unique<GlutenHDFSObjectStorage>(uri, 
std::move(settings), config);
+        });
+}
+#endif
+}
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp 
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
new file mode 100644
index 000000000..c7e9c5fd3
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <Disks/DiskFactory.h>
+#include <Interpreters/Context.h>
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Disks/ObjectStorages/MetadataStorageFactory.h>
+#include <Disks/ObjectStorages/ObjectStorageFactory.h>
+
+#if USE_HDFS
+#include <Disks/ObjectStorages/GlutenDiskHDFS.h>
+#endif
+
+#include "registerGlutenDisks.h"
+
+namespace local_engine
+{
+#if USE_AWS_S3
+void registerGlutenS3ObjectStorage(DB::ObjectStorageFactory & factory);
+#endif
+
+#if USE_HDFS
+void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory);
+#endif
+
+void registerGlutenDisks(bool global_skip_access_check)
+{
+    auto & factory = DB::DiskFactory::instance();
+    auto creator = [global_skip_access_check](
+                       const String & name,
+                       const Poco::Util::AbstractConfiguration & config,
+                       const String & config_prefix,
+                       DB::ContextPtr context,
+                       const DB::DisksMap & /* map */,
+                       bool,
+                       bool) -> DB::DiskPtr
+    {
+        bool skip_access_check = global_skip_access_check || 
config.getBool(config_prefix + ".skip_access_check", false);
+        auto object_storage = 
DB::ObjectStorageFactory::instance().create(name, config, config_prefix, 
context, skip_access_check);
+        auto metadata_storage = 
DB::MetadataStorageFactory::instance().create(name, config, config_prefix, 
object_storage, "local");
+
+        DB::DiskObjectStoragePtr disk = 
std::make_shared<DB::DiskObjectStorage>(
+            name,
+            object_storage->getCommonKeyPrefix(),
+            std::move(metadata_storage),
+            std::move(object_storage),
+            config,
+            config_prefix);
+
+        disk->startup(context, skip_access_check);
+        return disk;
+    };
+
+    auto & object_factory = DB::ObjectStorageFactory::instance();
+#if USE_AWS_S3
+    registerGlutenS3ObjectStorage(object_factory);
+    factory.registerDiskType("s3_gluten", creator); /// For compatibility
+#endif
+
+#if USE_HDFS
+    auto hdfs_creator = [global_skip_access_check](
+                            const String & name,
+                            const Poco::Util::AbstractConfiguration & config,
+                            const String & config_prefix,
+                            DB::ContextPtr context,
+                            const DB::DisksMap & /* map */,
+                            bool,
+                            bool) -> DB::DiskPtr
+    {
+        bool skip_access_check = global_skip_access_check || 
config.getBool(config_prefix + ".skip_access_check", false);
+        auto object_storage = 
DB::ObjectStorageFactory::instance().create(name, config, config_prefix, 
context, skip_access_check);
+        auto metadata_storage = 
DB::MetadataStorageFactory::instance().create(name, config, config_prefix, 
object_storage, "local");
+
+        DB::DiskObjectStoragePtr disk = 
std::make_shared<local_engine::GlutenDiskHDFS>(
+            name, object_storage->getCommonKeyPrefix(), 
std::move(metadata_storage), std::move(object_storage), config, config_prefix);
+
+        disk->startup(context, skip_access_check);
+        return disk;
+    };
+
+    registerGlutenHDFSObjectStorage(object_factory);
+    factory.registerDiskType("hdfs_gluten", hdfs_creator); /// For 
compatibility
+#endif
+}
+}
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.h 
b/cpp-ch/local-engine/Disks/registerGlutenDisks.h
new file mode 100644
index 000000000..a0c5d96d2
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.h
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace local_engine
+{
+
+/// @param global_skip_access_check - skip access check regardless regardless
+///                                   .skip_access_check config directive (used
+///                                   for clickhouse-disks)
+void registerGlutenDisks(bool global_skip_access_check);
+
+}
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 34746217b..82a64b999 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -23,6 +23,7 @@
 #include <Storages/StorageMergeTreeFactory.h>
 #include <Common/CHUtil.h>
 #include <Common/MergeTreeTool.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
 
 #include "MergeTreeRelParser.h"
 
@@ -61,17 +62,14 @@ static Int64 findMinPosition(const NameSet & 
condition_table_columns, const Name
 }
 
 CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
-    const substrait::Rel & rel_,
     const substrait::ReadRel::ExtensionTable & extension_table,
     ContextMutablePtr context)
 {
-    const auto & rel = rel_.read();
     google::protobuf::StringValue table;
     table.ParseFromString(extension_table.detail().value());
     auto merge_tree_table = 
local_engine::parseMergeTreeTableString(table.value());
     DB::Block header;
-    chassert(rel.has_base_schema());
-    header = TypeParser::buildBlockFromNamedStruct(rel.base_schema(), 
merge_tree_table.low_card_key);
+    header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema, 
merge_tree_table.low_card_key);
     auto names_and_types_list = header.getNamesAndTypesList();
     auto storage_factory = StorageMergeTreeFactory::instance();
     auto metadata = buildMetaData(names_and_types_list, context, 
merge_tree_table);
@@ -89,8 +87,7 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
                 context,
                 "",
                 MergeTreeData::MergingParams(),
-                buildMergeTreeSettings());
-            custom_storage_merge_tree->loadDataParts(false, std::nullopt);
+                buildMergeTreeSettings(merge_tree_table.table_configs));
             return custom_storage_merge_tree;
         });
     return storage;
@@ -137,13 +134,13 @@ MergeTreeRelParser::parseReadRel(
                 global_context,
                 "",
                 MergeTreeData::MergingParams(),
-                buildMergeTreeSettings());
+                buildMergeTreeSettings(merge_tree_table.table_configs));
             return custom_storage_merge_tree;
         });
 
+    restoreMetaData(storage, merge_tree_table, context);
     for (const auto & [name, sizes] : storage->getColumnSizes())
         column_sizes[name] = sizes.data_compressed;
-
     query_context.storage_snapshot = 
std::make_shared<StorageSnapshot>(*storage, metadata);
     query_context.custom_storage_merge_tree = storage;
     auto names_and_types_list = input.getNamesAndTypesList();
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h 
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
index 5f86a0cb4..921f3ac00 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
@@ -38,7 +38,6 @@ class MergeTreeRelParser : public RelParser
 {
 public:
     static std::shared_ptr<CustomStorageMergeTree> parseStorage(
-        const substrait::Rel & rel_,
         const substrait::ReadRel::ExtensionTable & extension_table,
         ContextMutablePtr context);
 
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index 9d7a35cb2..780d19cc8 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -90,7 +90,8 @@ CustomStorageMergeTree::CustomStorageMergeTree(
     , writer(*this)
     , reader(*this)
 {
-    initializeDirectoriesAndFormatVersion(relative_data_path_, attach, 
date_column_name);
+    relative_data_path = relative_data_path_;
+    format_version = 1;
 }
 
 std::atomic<int> CustomStorageMergeTree::part_num;
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
new file mode 100644
index 000000000..a7d167385
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MetaDataHelper.h"
+#include <filesystem>
+
+#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+
+using namespace DB;
+
+namespace local_engine
+{
+
+std::unordered_map<String, String> extractPartMetaData(ReadBuffer & in)
+{
+    std::unordered_map<String, String> result;
+    while (!in.eof())
+    {
+        String name;
+        readString(name, in);
+        assertChar('\t', in);
+        UInt64 size;
+        readIntText(size, in);
+        assertChar('\n', in);
+        String data;
+        data.resize(size);
+        in.read(data.data(), size);
+        result.emplace(name, data);
+    }
+    return result;
+}
+
+void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable 
& mergeTreeTable, ContextPtr & context)
+{
+    auto data_disk = storage->getStoragePolicy()->getAnyDisk();
+    if (!data_disk->isRemote())
+        return;
+
+    std::unordered_set<String> not_exists_part;
+    DB::MetadataStorageFromDisk * metadata_storage = 
static_cast<MetadataStorageFromDisk *>(data_disk->getMetadataStorage().get());
+    auto metadata_disk = metadata_storage->getDisk();
+    auto table_path = std::filesystem::path(mergeTreeTable.relative_path);
+    for (const auto & part : mergeTreeTable.getPartNames())
+    {
+        auto part_path = table_path / part;
+        if (!metadata_disk->exists(part_path))
+            not_exists_part.emplace(part);
+    }
+
+    if (not_exists_part.empty())
+        return;
+
+    if (auto lock = 
storage->lockForAlter(context->getSettingsRef().lock_acquire_timeout))
+    {
+        auto s3 = data_disk->getObjectStorage();
+
+        if (!metadata_disk->exists(table_path))
+            metadata_disk->createDirectories(table_path.generic_string());
+
+        for (const auto & part : not_exists_part)
+        {
+            auto part_path = table_path / part;
+            auto metadata_file_path = part_path / "metadata.gluten";
+
+            if (metadata_disk->exists(part_path))
+                continue;
+            else
+                metadata_disk->createDirectories(part_path);
+            auto key = 
s3->generateObjectKeyForPath(metadata_file_path.generic_string());
+            StoredObject metadata_object(key.serialize());
+            auto part_metadata = 
extractPartMetaData(*s3->readObject(metadata_object));
+            for (const auto & item : part_metadata)
+            {
+                auto item_path = part_path / item.first;
+                auto out = metadata_disk->writeFile(item_path);
+                out->write(item.second.data(), item.second.size());
+            }
+        }
+    }
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h 
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
new file mode 100644
index 000000000..47c5d615d
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <Common/MergeTreeTool.h>
+#include <Storages/StorageMergeTreeFactory.h>
+
+namespace local_engine
+{
+
+void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable 
& mergeTreeTable, ContextPtr & context);
+
+}
+
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index f5c9a1338..8df171f99 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -15,8 +15,11 @@
  * limitations under the License.
  */
 #include "SparkMergeTreeWriter.h"
+
 #include <Disks/createVolume.h>
+#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
 #include <Interpreters/ActionsDAG.h>
+#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
 #include <rapidjson/prettywriter.h>
 
 using namespace DB;
@@ -49,29 +52,59 @@ void SparkMergeTreeWriter::write(DB::Block & block)
     }
 
     auto blocks_with_partition = 
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block), 
10, metadata_snapshot, context);
-    for (auto & item : blocks_with_partition)
-    {
-        auto temp_part = writeTempPart(item, metadata_snapshot, context);
-        temp_part.finalize();
-        new_parts.emplace_back(temp_part.part);
+        for (auto & item : blocks_with_partition)
+        {
+            new_parts.emplace_back(writeTempPartAndFinalize(item, 
metadata_snapshot).part);
         part_num++;
     }
 }
 
 void SparkMergeTreeWriter::finalize()
 {
+    auto block = squashing_transform->add({});
+    if (block.rows())
+    {
+        auto blocks_with_partition = 
MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, 
metadata_snapshot, context);
+        for (auto & item : blocks_with_partition)
+            new_parts.emplace_back(writeTempPartAndFinalize(item, 
metadata_snapshot).part);
+    }
+}
+
+DB::MergeTreeDataWriter::TemporaryPart
+SparkMergeTreeWriter::writeTempPartAndFinalize(
+    DB::BlockWithPartition & block_with_partition,
+    const DB::StorageMetadataPtr & metadata_snapshot)
+{
+    auto temp_part = writeTempPart(block_with_partition, metadata_snapshot);
+    temp_part.finalize();
+    saveFileStatus(temp_part);
+    return temp_part;
+}
 
-    auto blocks_with_partition = 
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add({}), 10, 
metadata_snapshot, context);
-    for (auto & item : blocks_with_partition)
+void SparkMergeTreeWriter::saveFileStatus(const 
DB::MergeTreeDataWriter::TemporaryPart & temp_part) const
+{
+    auto & data_part_storage = temp_part.part->getDataPartStorage();
+
+    const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
+    if (!disk->isRemote()) return;
+    if (auto *const disk_metadata = dynamic_cast<MetadataStorageFromDisk 
*>(disk->getMetadataStorage().get()))
     {
-        auto temp_part = writeTempPart(item, metadata_snapshot, context);
-        temp_part.finalize();
-        new_parts.emplace_back(temp_part.part);
+        const auto out = data_part_storage.writeFile("metadata.gluten", 
DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
+        for (const auto it = data_part_storage.iterate(); it->isValid(); 
it->next())
+        {
+            auto content = disk_metadata->readFileToString(it->path());
+            writeString(it->name(), *out);
+            writeChar('\t', *out);
+            writeIntText(content.length(), *out);
+            writeChar('\n', *out);
+            writeString(content, *out);
+        }
+        out->finalize();
     }
 }
 
 MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart(
-    BlockWithPartition & block_with_partition, const StorageMetadataPtr & 
metadata_snapshot, ContextPtr context)
+    BlockWithPartition & block_with_partition, const StorageMetadataPtr & 
metadata_snapshot)
 {
     MergeTreeDataWriter::TemporaryPart temp_part;
     Block & block = block_with_partition.block;
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 5c63d1fef..d316f208e 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -82,7 +82,11 @@ public:
 
 private:
     DB::MergeTreeDataWriter::TemporaryPart
-    writeTempPart(DB::BlockWithPartition & block_with_partition, const 
DB::StorageMetadataPtr & metadata_snapshot, DB::ContextPtr context);
+    writeTempPart(DB::BlockWithPartition & block_with_partition, const 
DB::StorageMetadataPtr & metadata_snapshot);
+    DB::MergeTreeDataWriter::TemporaryPart
+    writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, 
const DB::StorageMetadataPtr & metadata_snapshot);
+    void saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart & 
temp_part) const;
+
     String uuid;
     String partition_dir;
     String bucket_dir;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index d1a675f49..7008c66f7 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1020,7 +1020,7 @@ JNIEXPORT jlong 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
         
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
 
     auto storage = local_engine::MergeTreeRelParser::parseStorage(
-        plan_ptr->relations()[0].root().input(), extension_table, 
local_engine::SerializedPlanParser::global_context);
+        extension_table, local_engine::SerializedPlanParser::global_context);
     auto uuid = uuid_str + "_" + task_id;
     auto * writer = new local_engine::SparkMergeTreeWriter(
         *storage, storage->getInMemoryMetadataPtr(), 
local_engine::SerializedPlanParser::global_context, uuid, partition_dir, 
bucket_dir);
diff --git 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
index bf942ef26..f07e6fccb 100644
--- 
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
+++ 
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.protobuf.StringValue;
 import io.substrait.proto.ReadRel;
 
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -71,10 +72,11 @@ public class ExtensionTableNode implements SplitInfo {
     this.maxPartsNum = maxPartsNum;
     this.database = database;
     this.tableName = tableName;
-    if (relativePath.contains(":/")) { // file:/tmp/xxx => tmp/xxx
-      this.relativePath = relativePath.substring(relativePath.indexOf(":/") + 
2);
+    URI table_uri = URI.create(relativePath);
+    if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx
+      this.relativePath = table_uri.getPath().substring(1);
     } else {
-      this.relativePath = relativePath;
+      this.relativePath = table_uri.getPath();
     }
     this.absolutePath = absolutePath;
     this.tableSchemaJson = tableSchemaJson;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to