This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 371d448c3 [GLUTEN-6840][CH] Enable cache files for hdfs (#6841)
371d448c3 is described below

commit 371d448c3b512b1cbb3272e932cb7a57b9da4ba6
Author: Shuai li <[email protected]>
AuthorDate: Wed Aug 21 14:40:54 2024 +0800

    [GLUTEN-6840][CH] Enable cache files for hdfs (#6841)
---
 .../gluten/sql/parser/GlutenCacheFileSqlBase.g4    | 224 +++++++++++++++++++++
 .../gluten/parser/GlutenCacheFilesSqlParser.scala  |  61 ++++++
 .../gluten/parser/GlutenCacheFilesSqlParser.scala  |  65 ++++++
 .../gluten/parser/GlutenCacheFilesSqlParser.scala  |  65 ++++++
 .../gluten/execution/CHNativeCacheManager.java     |   5 +
 .../org/apache/gluten/execution/CacheResult.java   |   4 +-
 .../org/apache/gluten/metrics/MetricsStep.java     |  66 ++++++
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  17 +-
 .../backendsapi/clickhouse/CHMetricsApi.scala      |  20 +-
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   4 +-
 .../metrics/FileSourceScanMetricsUpdater.scala     |  24 ++-
 ...se.scala => GlutenCacheFileSqlParserBase.scala} | 132 +++++-------
 .../parser/GlutenClickhouseSqlParserBase.scala     |  20 +-
 .../apache/gluten/parser/UpperCaseCharStream.scala |  38 ++++
 .../apache/spark/rpc/GlutenExecutorEndpoint.scala  |  13 +-
 .../org/apache/spark/rpc/GlutenRpcMessages.scala   |   6 +-
 .../commands/GlutenCHCacheDataCommand.scala        | 100 +--------
 .../sql/execution/commands/GlutenCacheBase.scala   | 123 +++++++++++
 .../commands/GlutenCacheFilesCommand.scala         | 188 +++++++++++++++++
 ...lutenClickHouseWholeStageTransformerSuite.scala |   3 +
 .../execution/tpch/GlutenClickHouseHDFSSuite.scala | 134 ++++++++++++
 cpp-ch/local-engine/Common/CHUtil.cpp              |  36 ++--
 cpp-ch/local-engine/Common/CHUtil.h                |   2 +
 cpp-ch/local-engine/Common/GlutenConfig.h          |  30 ++-
 cpp-ch/local-engine/Common/QueryContext.cpp        |  13 +-
 cpp-ch/local-engine/Common/QueryContext.h          |   1 +
 cpp-ch/local-engine/Parser/RelMetric.cpp           |  55 ++++-
 .../local-engine/Storages/Cache/CacheManager.cpp   |  67 +++++-
 cpp-ch/local-engine/Storages/Cache/CacheManager.h  |   8 +
 .../Storages/SubstraitSource/ReadBufferBuilder.cpp | 163 ++++++++++-----
 .../Storages/SubstraitSource/ReadBufferBuilder.h   |  25 +--
 cpp-ch/local-engine/local_engine_jni.cpp           |  24 +++
 32 files changed, 1427 insertions(+), 309 deletions(-)

diff --git 
a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
 
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
new file mode 100644
index 000000000..abdb0cdbf
--- /dev/null
+++ 
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar GlutenCacheFileSqlBase;
+
+@members {
+  /**
+   * Verify whether current token is a valid decimal token (which contains 
dot).
+   * Returns true if the character that follows the token is not a digit or 
letter or underscore.
+   *
+   * For example:
+   * For char stream "2.3", "2." is not a valid decimal token, because it is 
followed by digit '3'.
+   * For char stream "2.3_", "2.3" is not a valid decimal token, because it is 
followed by '_'.
+   * For char stream "2.3W", "2.3" is not a valid decimal token, because it is 
followed by 'W'.
+   * For char stream "12.0D 34.E2+0.12 "  12.0D is a valid decimal token 
because it is folllowed
+   * by a space. 34.E2 is a valid decimal token because it is followed by 
symbol '+'
+   * which is not a digit or letter or underscore.
+   */
+  public boolean isValidDecimal() {
+    int nextChar = _input.LA(1);
+    if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= 
'9' ||
+      nextChar == '_') {
+      return false;
+    } else {
+      return true;
+    }
+  }
+}
+
+tokens {
+    DELIMITER
+}
+
+singleStatement
+    : statement ';'* EOF
+    ;
+
+statement
+    : CACHE FILES ASYNC? SELECT selectedColumns=selectedColumnNames
+        FROM (path=STRING)
+        (CACHEPROPERTIES cacheProps=propertyList)?                             
  #cacheFiles
+    | .*?                                                                      
  #passThrough
+    ;
+
+selectedColumnNames
+    : ASTERISK
+    | identifier (COMMA identifier)*
+    ;
+
+propertyList
+    : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+    ;
+
+property
+    : key=propertyKey (EQ? value=propertyValue)?
+    ;
+
+propertyKey
+    : identifier (DOT identifier)*
+    | stringLit
+    ;
+
+propertyValue
+    : INTEGER_VALUE
+    | DECIMAL_VALUE
+    | booleanValue
+    | identifier LEFT_PAREN stringLit COMMA stringLit RIGHT_PAREN
+    | value=stringLit
+    ;
+
+stringLit
+    : STRING
+    | DOUBLEQUOTED_STRING
+    ;
+
+booleanValue
+    : TRUE | FALSE
+    ;
+
+identifier
+    : IDENTIFIER             #unquotedIdentifier
+    | quotedIdentifier       #quotedIdentifierAlternative
+    | nonReserved            #unquotedIdentifier
+    ;
+
+quotedIdentifier
+    : BACKQUOTED_IDENTIFIER
+    ;
+
+// Add keywords here so that people's queries don't break if they have a 
column name as one of
+// these tokens
+nonReserved
+    : CACHE | FILES | ASYNC
+    | SELECT | FOR | AFTER | CACHEPROPERTIES
+    | TIMESTAMP | AS | OF | DATE_PARTITION
+    |
+    ;
+
+// Define how the keywords above should appear in a user's SQL statement.
+CACHE: 'CACHE';
+META: 'META';
+ASYNC: 'ASYNC';
+SELECT: 'SELECT';
+COMMA: ',';
+FOR: 'FOR';
+FROM: 'FROM';
+AFTER: 'AFTER';
+CACHEPROPERTIES: 'CACHEPROPERTIES';
+DOT: '.';
+ASTERISK: '*';
+TIMESTAMP: 'TIMESTAMP';
+AS: 'AS';
+OF: 'OF';
+DATE_PARTITION: 'DATE_PARTITION';
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+TRUE: 'TRUE';
+FALSE: 'FALSE';
+FILES: 'FILES';
+
+EQ  : '=' | '==';
+NSEQ: '<=>';
+NEQ : '<>';
+NEQJ: '!=';
+LTE : '<=' | '!>';
+GTE : '>=' | '!<';
+CONCAT_PIPE: '||';
+
+STRING
+    : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+    | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+DOUBLEQUOTED_STRING
+    :'"' ( ~('"'|'\\') | ('\\' .) )* '"'
+    ;
+
+BIGINT_LITERAL
+    : DIGIT+ 'L'
+    ;
+
+SMALLINT_LITERAL
+    : DIGIT+ 'S'
+    ;
+
+TINYINT_LITERAL
+    : DIGIT+ 'Y'
+    ;
+
+INTEGER_VALUE
+    : DIGIT+
+    ;
+
+DECIMAL_VALUE
+    : DIGIT+ EXPONENT
+    | DECIMAL_DIGITS EXPONENT? {isValidDecimal()}?
+    ;
+
+DOUBLE_LITERAL
+    : DIGIT+ EXPONENT? 'D'
+    | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
+    ;
+
+BIGDECIMAL_LITERAL
+    : DIGIT+ EXPONENT? 'BD'
+    | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
+    ;
+
+IDENTIFIER
+    : (LETTER | DIGIT | '_')+
+    ;
+
+BACKQUOTED_IDENTIFIER
+    : '`' ( ~'`' | '``' )* '`'
+    ;
+
+fragment DECIMAL_DIGITS
+    : DIGIT+ '.' DIGIT*
+    | '.' DIGIT+
+    ;
+
+fragment EXPONENT
+    : 'E' [+-]? DIGIT+
+    ;
+
+fragment DIGIT
+    : [0-9]
+    ;
+
+fragment LETTER
+    : [A-Z]
+    ;
+
+SIMPLE_COMMENT
+    : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
+    ;
+
+BRACKETED_COMMENT
+    : '/*' .*? '*/' -> channel(HIDDEN)
+    ;
+
+WS  : [ \r\n\t]+ -> channel(HIDDEN)
+    ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+    : .
+    ;
diff --git 
a/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
 
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 000000000..48d26498f
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+  extends GlutenCacheFileSqlParserBase {
+
+  override def parsePlan(sqlText: String): LogicalPlan =
+    parse(sqlText) {
+      parser =>
+        astBuilder.visit(parser.singleStatement()) match {
+          case plan: LogicalPlan => plan
+          case _ => delegate.parsePlan(sqlText)
+        }
+    }
+
+  override def parseExpression(sqlText: String): Expression = {
+    delegate.parseExpression(sqlText)
+  }
+
+  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+    delegate.parseTableIdentifier(sqlText)
+  }
+
+  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+    delegate.parseFunctionIdentifier(sqlText)
+  }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseTableSchema(sqlText: String): StructType = {
+    delegate.parseTableSchema(sqlText)
+  }
+
+  override def parseDataType(sqlText: String): DataType = {
+    delegate.parseDataType(sqlText)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
 
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 000000000..9a0cde772
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+  extends GlutenCacheFileSqlParserBase {
+
+  override def parsePlan(sqlText: String): LogicalPlan =
+    parse(sqlText) {
+      parser =>
+        astBuilder.visit(parser.singleStatement()) match {
+          case plan: LogicalPlan => plan
+          case _ => delegate.parsePlan(sqlText)
+        }
+    }
+
+  override def parseExpression(sqlText: String): Expression = {
+    delegate.parseExpression(sqlText)
+  }
+
+  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+    delegate.parseTableIdentifier(sqlText)
+  }
+
+  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+    delegate.parseFunctionIdentifier(sqlText)
+  }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseTableSchema(sqlText: String): StructType = {
+    delegate.parseTableSchema(sqlText)
+  }
+
+  override def parseDataType(sqlText: String): DataType = {
+    delegate.parseDataType(sqlText)
+  }
+
+  override def parseQuery(sqlText: String): LogicalPlan = {
+    delegate.parseQuery(sqlText)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
 
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 000000000..9a0cde772
--- /dev/null
+++ 
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+  extends GlutenCacheFileSqlParserBase {
+
+  override def parsePlan(sqlText: String): LogicalPlan =
+    parse(sqlText) {
+      parser =>
+        astBuilder.visit(parser.singleStatement()) match {
+          case plan: LogicalPlan => plan
+          case _ => delegate.parsePlan(sqlText)
+        }
+    }
+
+  override def parseExpression(sqlText: String): Expression = {
+    delegate.parseExpression(sqlText)
+  }
+
+  override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+    delegate.parseTableIdentifier(sqlText)
+  }
+
+  override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+    delegate.parseFunctionIdentifier(sqlText)
+  }
+
+  override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+    delegate.parseMultipartIdentifier(sqlText)
+  }
+
+  override def parseTableSchema(sqlText: String): StructType = {
+    delegate.parseTableSchema(sqlText)
+  }
+
+  override def parseDataType(sqlText: String): DataType = {
+    delegate.parseDataType(sqlText)
+  }
+
+  override def parseQuery(sqlText: String): LogicalPlan = {
+    delegate.parseQuery(sqlText)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
index 7b765924f..4033d8c6b 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
@@ -30,4 +30,9 @@ public class CHNativeCacheManager {
   }
 
   private static native CacheResult nativeGetCacheStatus(String jobId);
+
+  public static native String nativeCacheFiles(byte[] files);
+
+  // only for ut
+  public static native void removeFiles(String file, String cacheName);
 }
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
index 0fa69e0d0..b6d538039 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
@@ -16,7 +16,9 @@
  */
 package org.apache.gluten.execution;
 
-public class CacheResult {
+import java.io.Serializable;
+
+public class CacheResult implements Serializable {
   public enum Status {
     RUNNING(0),
     SUCCESS(1),
diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java 
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
index f95aaa323..39dd94965 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
@@ -35,6 +35,24 @@ public class MetricsStep {
   @JsonProperty("selected_marks")
   protected long selectedMarks;
 
+  @JsonProperty("read_cache_hits")
+  protected long readCacheHits;
+
+  @JsonProperty("miss_cache_hits")
+  protected long missCacheHits;
+
+  @JsonProperty("read_cache_bytes")
+  protected long readCacheBytes;
+
+  @JsonProperty("read_miss_bytes")
+  protected long readMissBytes;
+
+  @JsonProperty("read_cache_millisecond")
+  protected long readCacheMillisecond;
+
+  @JsonProperty("miss_cache_millisecond")
+  protected long missCacheMillisecond;
+
   public String getName() {
     return name;
   }
@@ -82,4 +100,52 @@ public class MetricsStep {
   public long getSelectedMarksPk() {
     return selectedMarksPk;
   }
+
+  public long getReadCacheHits() {
+    return readCacheHits;
+  }
+
+  public void setReadCacheHits(long readCacheHits) {
+    this.readCacheHits = readCacheHits;
+  }
+
+  public long getMissCacheHits() {
+    return missCacheHits;
+  }
+
+  public void setMissCacheHits(long missCacheHits) {
+    this.missCacheHits = missCacheHits;
+  }
+
+  public long getReadCacheBytes() {
+    return readCacheBytes;
+  }
+
+  public void setReadCacheBytes(long readCacheBytes) {
+    this.readCacheBytes = readCacheBytes;
+  }
+
+  public long getReadMissBytes() {
+    return readMissBytes;
+  }
+
+  public void setReadMissBytes(long readMissBytes) {
+    this.readMissBytes = readMissBytes;
+  }
+
+  public long getReadCacheMillisecond() {
+    return readCacheMillisecond;
+  }
+
+  public void setReadCacheMillisecond(long readCacheMillisecond) {
+    this.readCacheMillisecond = readCacheMillisecond;
+  }
+
+  public long getMissCacheMillisecond() {
+    return missCacheMillisecond;
+  }
+
+  public void setMissCacheMillisecond(long missCacheMillisecond) {
+    this.missCacheMillisecond = missCacheMillisecond;
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 7519580b9..c77d57262 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.execution._
 import org.apache.gluten.expression.ConverterUtils
 import org.apache.gluten.memory.CHThreadGroup
 import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
+import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.plan.PlanNode
 import org.apache.gluten.substrait.rel._
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -164,6 +165,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
         val paths = new JArrayList[String]()
         val starts = new JArrayList[JLong]()
         val lengths = new JArrayList[JLong]()
+        val fileSizes = new JArrayList[JLong]()
+        val modificationTimes = new JArrayList[JLong]()
         val partitionColumns = new JArrayList[JMap[String, String]]
         f.files.foreach {
           file =>
@@ -173,6 +176,16 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
             // TODO: Support custom partition location
             val partitionColumn = new JHashMap[String, String]()
             partitionColumns.add(partitionColumn)
+            val (fileSize, modificationTime) =
+              
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
+            (fileSize, modificationTime) match {
+              case (Some(size), Some(time)) =>
+                fileSizes.add(JLong.valueOf(size))
+                modificationTimes.add(JLong.valueOf(time))
+              case _ =>
+                fileSizes.add(0)
+                modificationTimes.add(0)
+            }
         }
         val preferredLocations =
           CHAffinity.getFilePartitionLocations(paths.asScala.toArray, 
f.preferredLocations())
@@ -181,8 +194,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
           paths,
           starts,
           lengths,
-          new JArrayList[JLong](),
-          new JArrayList[JLong](),
+          fileSizes,
+          modificationTimes,
           partitionColumns,
           new JArrayList[JMap[String, String]](),
           fileFormat,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 9058bffd8..c53448cdd 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -128,7 +128,25 @@ class CHMetricsApi extends MetricsApi with Logging with 
LogLevelUtil {
       "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra 
operators time"),
       "selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected 
marks primary"),
       "selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected 
marks"),
-      "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks 
primary")
+      "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks 
primary"),
+      "readCacheHits" -> SQLMetrics.createMetric(
+        sparkContext,
+        "Number of times the read from filesystem cache hit the cache"),
+      "missCacheHits" -> SQLMetrics.createMetric(
+        sparkContext,
+        "Number of times the read from filesystem cache miss the cache"),
+      "readCacheBytes" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "Bytes read from filesystem cache"),
+      "readMissBytes" -> SQLMetrics.createSizeMetric(
+        sparkContext,
+        "Bytes read from filesystem cache source (from remote fs, etc)"),
+      "readCacheMillisecond" -> SQLMetrics.createTimingMetric(
+        sparkContext,
+        "Time reading from filesystem cache"),
+      "missCacheMillisecond" -> SQLMetrics.createTimingMetric(
+        sparkContext,
+        "Time reading from filesystem cache source (from remote filesystem, 
etc)")
     )
 
   override def genFileSourceScanTransformerMetricsUpdater(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 177d6a6f0..f4a7522d3 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -24,7 +24,7 @@ import 
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
 import org.apache.gluten.extension.columnar.transition.{InsertTransitions, 
RemoveTransitions}
 import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
 import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, 
RasInjector}
-import org.apache.gluten.parser.GlutenClickhouseSqlParser
+import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, 
GlutenClickhouseSqlParser}
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, 
EqualToRewrite}
@@ -44,6 +44,8 @@ private object CHRuleApi {
   def injectSpark(injector: SparkInjector): Unit = {
     // Regular Spark rules.
     
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
+    injector.injectParser(
+      (spark, parserInterface) => new GlutenCacheFilesSqlParser(spark, 
parserInterface))
     injector.injectParser(
       (spark, parserInterface) => new GlutenClickhouseSqlParser(spark, 
parserInterface))
     injector.injectResolutionRule(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index f44c5ed1a..4dcae8feb 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -35,9 +35,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
   val extraTime: SQLMetric = metrics("extraTime")
   val inputWaitTime: SQLMetric = metrics("inputWaitTime")
   val outputWaitTime: SQLMetric = metrics("outputWaitTime")
-  val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
-  val selected_marks: SQLMetric = metrics("selectedMarks")
-  val total_marks_pk: SQLMetric = metrics("totalMarksPk")
+  val selectedMarksPK: SQLMetric = metrics("selectedMarksPk")
+  val selectedMarks: SQLMetric = metrics("selectedMarks")
+  val totalMarksPK: SQLMetric = metrics("totalMarksPk")
+  val readCacheHits: SQLMetric = metrics("readCacheHits")
+  val missCacheHits: SQLMetric = metrics("missCacheHits")
+  val readCacheBytes: SQLMetric = metrics("readCacheBytes")
+  val readMissBytes: SQLMetric = metrics("readMissBytes")
+  val readCacheMillisecond: SQLMetric = metrics("readCacheMillisecond")
+  val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond")
 
   override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
     // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
@@ -56,9 +62,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics: 
Map[String, SQLMetric
 
         metricsData.getSteps.forEach(
           step => {
-            selected_marks_pk += step.selectedMarksPk
-            selected_marks += step.selectedMarks
-            total_marks_pk += step.totalMarksPk
+            selectedMarksPK += step.selectedMarksPk
+            selectedMarks += step.selectedMarks
+            totalMarksPK += step.totalMarksPk
+            readCacheHits += step.readCacheHits
+            missCacheHits += step.missCacheHits
+            readCacheBytes += step.readCacheBytes
+            readMissBytes += step.readMissBytes
+            readCacheMillisecond += step.readCacheMillisecond
+            missCacheMillisecond += step.missCacheMillisecond
           })
 
         MetricsUtil.updateExtraTimeMetric(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
similarity index 57%
copy from 
backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
copy to 
backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
index 18fc102be..b031dcf7a 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
@@ -16,40 +16,38 @@
  */
 package org.apache.gluten.parser
 
-import org.apache.gluten.sql.parser.{GlutenClickhouseSqlBaseBaseListener, 
GlutenClickhouseSqlBaseBaseVisitor, GlutenClickhouseSqlBaseLexer, 
GlutenClickhouseSqlBaseParser}
-import org.apache.gluten.sql.parser.GlutenClickhouseSqlBaseParser._
+import org.apache.gluten.sql.parser.{GlutenCacheFileSqlBaseBaseListener, 
GlutenCacheFileSqlBaseBaseVisitor, GlutenCacheFileSqlBaseLexer, 
GlutenCacheFileSqlBaseParser}
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.{ParseErrorListener, 
ParseException, ParserInterface}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand
+import org.apache.spark.sql.execution.commands.GlutenCacheFilesCommand
 import org.apache.spark.sql.internal.VariableSubstitution
 
 import org.antlr.v4.runtime._
 import org.antlr.v4.runtime.atn.PredictionMode
-import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.antlr.v4.runtime.misc.ParseCancellationException
 import org.antlr.v4.runtime.tree.TerminalNodeImpl
 
 import java.util.Locale
 
 import scala.collection.JavaConverters._
 
-trait GlutenClickhouseSqlParserBase extends ParserInterface {
-
-  protected val astBuilder = new GlutenClickhouseSqlAstBuilder
+trait GlutenCacheFileSqlParserBase extends ParserInterface {
+  protected val astBuilder = new GlutenCacheFileSqlAstBuilder
   protected val substitution = new VariableSubstitution
 
-  protected def parse[T](command: String)(toResult: 
GlutenClickhouseSqlBaseParser => T): T = {
-    val lexer = new GlutenClickhouseSqlBaseLexer(
+  protected def parse[T](command: String)(toResult: 
GlutenCacheFileSqlBaseParser => T): T = {
+    val lexer = new GlutenCacheFileSqlBaseLexer(
       new 
UpperCaseCharStream(CharStreams.fromString(substitution.substitute(command))))
     lexer.removeErrorListeners()
     lexer.addErrorListener(ParseErrorListener)
 
     val tokenStream = new CommonTokenStream(lexer)
-    val parser = new GlutenClickhouseSqlBaseParser(tokenStream)
-    parser.addParseListener(PostProcessor)
+    val parser = new GlutenCacheFileSqlBaseParser(tokenStream)
+
+    parser.addParseListener(GlutenCacheFileSqlPostProcessor)
     parser.removeErrorListeners()
     parser.addErrorListener(ParseErrorListener)
 
@@ -80,33 +78,34 @@ trait GlutenClickhouseSqlParserBase extends ParserInterface 
{
           message = e.message,
           start = position,
           stop = position,
-          errorClass = Some("GLUTEN_CH_PARSING_ANALYSIS_ERROR"))
+          errorClass = Some("GLUTEN_CACHE_FILE_PARSING_ANALYSIS_ERROR"))
     }
   }
 }
 
-class GlutenClickhouseSqlAstBuilder extends 
GlutenClickhouseSqlBaseBaseVisitor[AnyRef] {
-
+class GlutenCacheFileSqlAstBuilder extends 
GlutenCacheFileSqlBaseBaseVisitor[AnyRef] {
   import org.apache.spark.sql.catalyst.parser.ParserUtils._
 
   /** Convert a property list into a key-value map. */
-  override def visitPropertyList(ctx: PropertyListContext): Map[String, 
String] = withOrigin(ctx) {
-    val properties = ctx.property.asScala.map {
-      property =>
-        val key = visitPropertyKey(property.key)
-        val value = visitPropertyValue(property.value)
-        key -> value
+  override def visitPropertyList(
+      ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String, 
String] =
+    withOrigin(ctx) {
+      val properties = ctx.property.asScala.map {
+        property =>
+          val key = visitPropertyKey(property.key)
+          val value = visitPropertyValue(property.value)
+          key -> value
+      }
+      // Check for duplicate property names.
+      checkDuplicateKeys(properties.toSeq, ctx)
+      properties.toMap
     }
-    // Check for duplicate property names.
-    checkDuplicateKeys(properties.toSeq, ctx)
-    properties.toMap
-  }
 
   /**
    * A property key can either be String or a collection of dot separated 
elements. This function
    * extracts the property key based on whether its a string literal or a 
property identifier.
    */
-  override def visitPropertyKey(key: PropertyKeyContext): String = {
+  override def visitPropertyKey(key: 
GlutenCacheFileSqlBaseParser.PropertyKeyContext): String = {
     if (key.stringLit() != null) {
       string(visitStringLit(key.stringLit()))
     } else {
@@ -118,7 +117,8 @@ class GlutenClickhouseSqlAstBuilder extends 
GlutenClickhouseSqlBaseBaseVisitor[A
    * A property value can be String, Integer, Boolean or Decimal. This 
function extracts the
    * property value based on whether its a string, integer, boolean or decimal 
literal.
    */
-  override def visitPropertyValue(value: PropertyValueContext): String = {
+  override def visitPropertyValue(
+      value: GlutenCacheFileSqlBaseParser.PropertyValueContext): String = {
     if (value == null) {
       null
     } else if (value.identifier != null) {
@@ -132,7 +132,8 @@ class GlutenClickhouseSqlAstBuilder extends 
GlutenClickhouseSqlBaseBaseVisitor[A
     }
   }
 
-  def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+  def visitPropertyKeyValues(
+      ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String, 
String] = {
     val props = visitPropertyList(ctx)
     val badKeys = props.collect { case (key, null) => key }
     if (badKeys.nonEmpty) {
@@ -143,7 +144,7 @@ class GlutenClickhouseSqlAstBuilder extends 
GlutenClickhouseSqlBaseBaseVisitor[A
     props
   }
 
-  override def visitStringLit(ctx: StringLitContext): Token = {
+  override def visitStringLit(ctx: 
GlutenCacheFileSqlBaseParser.StringLitContext): Token = {
     if (ctx != null) {
       if (ctx.STRING != null) {
         ctx.STRING.getSymbol
@@ -156,58 +157,32 @@ class GlutenClickhouseSqlAstBuilder extends 
GlutenClickhouseSqlBaseBaseVisitor[A
   }
 
   override def visitSingleStatement(
-      ctx: GlutenClickhouseSqlBaseParser.SingleStatementContext): AnyRef = 
withOrigin(ctx) {
+      ctx: GlutenCacheFileSqlBaseParser.SingleStatementContext): AnyRef = 
withOrigin(ctx) {
     visit(ctx.statement).asInstanceOf[LogicalPlan]
   }
 
-  override def visitCacheData(ctx: 
GlutenClickhouseSqlBaseParser.CacheDataContext): AnyRef =
+  override def visitCacheFiles(ctx: 
GlutenCacheFileSqlBaseParser.CacheFilesContext): AnyRef =
     withOrigin(ctx) {
-      val onlyMetaCache = ctx.META != null
       val asynExecute = ctx.ASYNC != null
-      val (tsfilter, partitionColumn, partitionValue) = if (ctx.AFTER != null) 
{
-        if (ctx.filter.TIMESTAMP != null) {
-          (Some(string(ctx.filter.timestamp)), None, None)
-        } else if (ctx.filter.datepartition != null && ctx.filter.datetime != 
null) {
-          (None, Some(ctx.filter.datepartition.getText), 
Some(string(ctx.filter.datetime)))
-        } else {
-          throw new ParseException(s"Illegal filter value ${ctx.getText}", ctx)
-        }
-      } else {
-        (None, None, None)
-      }
       val selectedColuman = visitSelectedColumnNames(ctx.selectedColumns)
-      val tablePropertyOverrides = Option(ctx.cacheProps)
+      val propertyOverrides = Option(ctx.cacheProps)
         .map(visitPropertyKeyValues)
         .getOrElse(Map.empty[String, String])
+      val path = ctx.path.getText
 
-      GlutenCHCacheDataCommand(
-        onlyMetaCache,
+      GlutenCacheFilesCommand(
         asynExecute,
         selectedColuman,
-        Option(ctx.path).map(string),
-        Option(ctx.table).map(visitTableIdentifier),
-        tsfilter,
-        partitionColumn,
-        partitionValue,
-        tablePropertyOverrides
+        path.substring(1, path.length - 1),
+        propertyOverrides
       )
     }
 
-  override def visitPassThrough(ctx: 
GlutenClickhouseSqlBaseParser.PassThroughContext): AnyRef =
+  override def visitPassThrough(ctx: 
GlutenCacheFileSqlBaseParser.PassThroughContext): AnyRef =
     null
 
-  protected def visitTableIdentifier(ctx: QualifiedNameContext): 
TableIdentifier = withOrigin(ctx) {
-    ctx.identifier.asScala.toSeq match {
-      case Seq(tbl) => TableIdentifier(tbl.getText)
-      case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
-      // TODO: Spark 3.5 supports catalog parameter
-      // case Seq(catalog, db, tbl) =>
-      //   TableIdentifier(tbl.getText, Some(db.getText), 
Some(catalog.getText))
-      case _ => throw new ParseException(s"Illegal table name ${ctx.getText}", 
ctx)
-    }
-  }
-
-  override def visitSelectedColumnNames(ctx: SelectedColumnNamesContext): 
Option[Seq[String]] =
+  override def visitSelectedColumnNames(
+      ctx: GlutenCacheFileSqlBaseParser.SelectedColumnNamesContext): 
Option[Seq[String]] =
     withOrigin(ctx) {
       if (ctx != null) {
         if (ctx.ASTERISK != null) {
@@ -224,10 +199,11 @@ class GlutenClickhouseSqlAstBuilder extends 
GlutenClickhouseSqlBaseBaseVisitor[A
     }
 }
 
-case object PostProcessor extends GlutenClickhouseSqlBaseBaseListener {
+case object GlutenCacheFileSqlPostProcessor extends 
GlutenCacheFileSqlBaseBaseListener {
 
   /** Remove the back ticks from an Identifier. */
-  override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = {
+  override def exitQuotedIdentifier(
+      ctx: GlutenCacheFileSqlBaseParser.QuotedIdentifierContext): Unit = {
     replaceTokenByIdentifier(ctx, 1) {
       token =>
         // Remove the double back ticks in the string.
@@ -237,7 +213,7 @@ case object PostProcessor extends 
GlutenClickhouseSqlBaseBaseListener {
   }
 
   /** Treat non-reserved keywords as Identifiers. */
-  override def exitNonReserved(ctx: NonReservedContext): Unit = {
+  override def exitNonReserved(ctx: 
GlutenCacheFileSqlBaseParser.NonReservedContext): Unit = {
     replaceTokenByIdentifier(ctx, 0)(identity)
   }
 
@@ -248,7 +224,7 @@ case object PostProcessor extends 
GlutenClickhouseSqlBaseBaseListener {
     val token = ctx.getChild(0).getPayload.asInstanceOf[Token]
     val newToken = new CommonToken(
       new org.antlr.v4.runtime.misc.Pair(token.getTokenSource, 
token.getInputStream),
-      GlutenClickhouseSqlBaseParser.IDENTIFIER,
+      GlutenCacheFileSqlBaseParser.IDENTIFIER,
       token.getChannel,
       token.getStartIndex + stripMargins,
       token.getStopIndex - stripMargins
@@ -256,21 +232,3 @@ case object PostProcessor extends 
GlutenClickhouseSqlBaseBaseListener {
     parent.addChild(new TerminalNodeImpl(f(newToken)))
   }
 }
-
-class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
-  override def consume(): Unit = wrapped.consume
-  override def getSourceName(): String = wrapped.getSourceName
-  override def index(): Int = wrapped.index
-  override def mark(): Int = wrapped.mark
-  override def release(marker: Int): Unit = wrapped.release(marker)
-  override def seek(where: Int): Unit = wrapped.seek(where)
-  override def size(): Int = wrapped.size
-
-  override def getText(interval: Interval): String = wrapped.getText(interval)
-
-  override def LA(i: Int): Int = {
-    val la = wrapped.LA(i)
-    if (la == 0 || la == IntStream.EOF) la
-    else Character.toUpperCase(la)
-  }
-}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
index 18fc102be..4a3883c8c 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.VariableSubstitution
 
 import org.antlr.v4.runtime._
 import org.antlr.v4.runtime.atn.PredictionMode
-import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.antlr.v4.runtime.misc.ParseCancellationException
 import org.antlr.v4.runtime.tree.TerminalNodeImpl
 
 import java.util.Locale
@@ -256,21 +256,3 @@ case object PostProcessor extends 
GlutenClickhouseSqlBaseBaseListener {
     parent.addChild(new TerminalNodeImpl(f(newToken)))
   }
 }
-
-class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
-  override def consume(): Unit = wrapped.consume
-  override def getSourceName(): String = wrapped.getSourceName
-  override def index(): Int = wrapped.index
-  override def mark(): Int = wrapped.mark
-  override def release(marker: Int): Unit = wrapped.release(marker)
-  override def seek(where: Int): Unit = wrapped.seek(where)
-  override def size(): Int = wrapped.size
-
-  override def getText(interval: Interval): String = wrapped.getText(interval)
-
-  override def LA(i: Int): Int = {
-    val la = wrapped.LA(i)
-    if (la == 0 || la == IntStream.EOF) la
-    else Character.toUpperCase(la)
-  }
-}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
new file mode 100644
index 000000000..6ee2aac81
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.antlr.v4.runtime.{CharStream, CodePointCharStream, IntStream}
+import org.antlr.v4.runtime.misc.Interval
+
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+  override def consume(): Unit = wrapped.consume
+  override def getSourceName(): String = wrapped.getSourceName
+  override def index(): Int = wrapped.index
+  override def mark(): Int = wrapped.mark
+  override def release(marker: Int): Unit = wrapped.release(marker)
+  override def seek(where: Int): Unit = wrapped.seek(where)
+  override def size(): Int = wrapped.size
+
+  override def getText(interval: Interval): String = wrapped.getText(interval)
+
+  override def LA(i: Int): Int = {
+    val la = wrapped.LA(i)
+    if (la == 0 || la == IntStream.EOF) la
+    else Character.toUpperCase(la)
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 8a3bde235..7f2b94eea 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -79,9 +79,20 @@ class GlutenExecutorEndpoint(val executorId: String, val 
conf: SparkConf)
           context.reply(
             CacheJobInfo(status = false, "", s"executor: $executorId cache 
data failed."))
       }
-    case GlutenMergeTreeCacheLoadStatus(jobId) =>
+    case GlutenCacheLoadStatus(jobId) =>
       val status = CHNativeCacheManager.getCacheStatus(jobId)
       context.reply(status)
+    case GlutenFilesCacheLoad(files) =>
+      try {
+        val jobId = CHNativeCacheManager.nativeCacheFiles(files)
+        context.reply(CacheJobInfo(status = true, jobId))
+      } catch {
+        case e: Exception =>
+          context.reply(
+            CacheJobInfo(
+              status = false,
+              s"executor: $executorId cache data failed. ${e.getMessage}"))
+      }
     case e =>
       logError(s"Received unexpected message. $e")
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index 800b15b99..e596e94fe 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -39,8 +39,12 @@ object GlutenRpcMessages {
   case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: 
util.Set[String])
     extends GlutenRpcMessage
 
-  case class GlutenMergeTreeCacheLoadStatus(jobId: String)
+  case class GlutenCacheLoadStatus(jobId: String)
 
   case class CacheJobInfo(status: Boolean, jobId: String, reason: String = "")
     extends GlutenRpcMessage
+
+  case class GlutenFilesCacheLoad(files: Array[Byte]) extends GlutenRpcMessage
+
+  case class GlutenFilesCacheLoadStatus(jobId: String)
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index f32d22d5e..bb3cb5acc 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -16,24 +16,20 @@
  */
 package org.apache.spark.sql.execution.commands
 
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.CacheResult
-import org.apache.gluten.execution.CacheResult.Status
 import org.apache.gluten.expression.ConverterUtils
 import org.apache.gluten.substrait.rel.ExtensionTableBuilder
 
 import org.apache.spark.affinity.CHAffinity
 import org.apache.spark.rpc.GlutenDriverEndpoint
-import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, 
GlutenMergeTreeCacheLoad, GlutenMergeTreeCacheLoadStatus}
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, 
GlutenMergeTreeCacheLoad}
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, GreaterThanOrEqual, IsNotNull, Literal}
 import org.apache.spark.sql.delta._
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import 
org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.{checkExecutorId,
 collectJobTriggerResult, toExecutorId, waitAllJobFinish, waitRpcResults}
+import org.apache.spark.sql.execution.commands.GlutenCacheBase._
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
 import org.apache.spark.sql.types.{BooleanType, StringType}
-import org.apache.spark.util.ThreadUtils
 
 import org.apache.hadoop.fs.Path
 
@@ -43,7 +39,6 @@ import java.util.{ArrayList => JList}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.Future
-import scala.concurrent.duration.Duration
 
 case class GlutenCHCacheDataCommand(
     onlyMetaCache: Boolean,
@@ -140,9 +135,7 @@ case class GlutenCHCacheDataCommand(
     val executorIdsToAddFiles =
       scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]()
     val executorIdsToParts = scala.collection.mutable.Map[String, String]()
-    executorIdsToAddFiles.put(
-      GlutenCHCacheDataCommand.ALL_EXECUTORS,
-      new ArrayBuffer[AddMergeTreeParts]())
+    executorIdsToAddFiles.put(ALL_EXECUTORS, new 
ArrayBuffer[AddMergeTreeParts]())
     selectedAddFiles.foreach(
       addFile => {
         val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts]
@@ -156,7 +149,7 @@ case class GlutenCHCacheDataCommand(
 
         if (locations.isEmpty) {
           // non soft affinity
-          executorIdsToAddFiles(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+          executorIdsToAddFiles(ALL_EXECUTORS)
             .append(mergeTreePart)
         } else {
           locations.foreach(
@@ -205,9 +198,9 @@ case class GlutenCHCacheDataCommand(
         }
       })
     val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
-    if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) {
+    if (executorIdsToParts.contains(ALL_EXECUTORS)) {
       // send all parts to all executors
-      val tableMessage = 
executorIdsToParts(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+      val tableMessage = executorIdsToParts(ALL_EXECUTORS)
       GlutenDriverEndpoint.executorDataMap.forEach(
         (executorId, executor) => {
           futureList.append(
@@ -230,86 +223,7 @@ case class GlutenCHCacheDataCommand(
               )))
         })
     }
-    val resultList = waitRpcResults(futureList)
-    if (asynExecute) {
-      val res = collectJobTriggerResult(resultList)
-      Seq(Row(res._1, res._2.mkString(";")))
-    } else {
-      val res = waitAllJobFinish(resultList)
-      Seq(Row(res._1, res._2))
-    }
-  }
-
-}
-
-object GlutenCHCacheDataCommand {
-  private val ALL_EXECUTORS = "allExecutors"
-
-  private def toExecutorId(executorId: String): String =
-    executorId.split("_").last
-
-  def waitAllJobFinish(jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean, 
String) = {
-    val res = collectJobTriggerResult(jobs)
-    var status = res._1
-    val messages = res._2
-    jobs.foreach(
-      job => {
-        if (status) {
-          var complete = false
-          while (!complete) {
-            Thread.sleep(5000)
-            val future_result = GlutenDriverEndpoint.executorDataMap
-              .get(toExecutorId(job._1))
-              .executorEndpointRef
-              .ask[CacheResult](GlutenMergeTreeCacheLoadStatus(job._2.jobId))
-            val result = ThreadUtils.awaitResult(future_result, Duration.Inf)
-            result.getStatus match {
-              case Status.ERROR =>
-                status = false
-                messages.append(
-                  s"executor : {}, failed with message: {};",
-                  job._1,
-                  result.getMessage)
-                complete = true
-              case Status.SUCCESS =>
-                complete = true
-              case _ =>
-              // still running
-            }
-          }
-        }
-      })
-    (status, messages.mkString(";"))
-  }
-
-  private def collectJobTriggerResult(jobs: ArrayBuffer[(String, 
CacheJobInfo)]) = {
-    var status = true
-    val messages = ArrayBuffer[String]()
-    jobs.foreach(
-      job => {
-        if (!job._2.status) {
-          messages.append(job._2.reason)
-          status = false
-        }
-      })
-    (status, messages)
-  }
 
-  private def waitRpcResults = (futureList: ArrayBuffer[(String, 
Future[CacheJobInfo])]) => {
-    val resultList = ArrayBuffer[(String, CacheJobInfo)]()
-    futureList.foreach(
-      f => {
-        resultList.append((f._1, ThreadUtils.awaitResult(f._2, Duration.Inf)))
-      })
-    resultList
+    getResult(futureList, asynExecute)
   }
-
-  private def checkExecutorId(executorId: String): Unit = {
-    if 
(!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) {
-      throw new GlutenException(
-        s"executor $executorId not found," +
-          s" all executors are 
${GlutenDriverEndpoint.executorDataMap.toString}")
-    }
-  }
-
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
new file mode 100644
index 000000000..c4e9f51bc
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.execution.CacheResult
+import org.apache.gluten.execution.CacheResult.Status
+
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, 
GlutenCacheLoadStatus}
+import org.apache.spark.sql.Row
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+object GlutenCacheBase {
+  def ALL_EXECUTORS: String = "allExecutors"
+
+  def toExecutorId(executorId: String): String =
+    executorId.split("_").last
+
+  protected def waitRpcResults
+      : ArrayBuffer[(String, Future[CacheJobInfo])] => ArrayBuffer[(String, 
CacheJobInfo)] =
+    (futureList: ArrayBuffer[(String, Future[CacheJobInfo])]) => {
+      val resultList = ArrayBuffer[(String, CacheJobInfo)]()
+      futureList.foreach(
+        f => {
+          resultList.append((f._1, ThreadUtils.awaitResult(f._2, 
Duration.Inf)))
+        })
+      resultList
+    }
+
+  def checkExecutorId(executorId: String): Unit = {
+    if 
(!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) {
+      throw new GlutenException(
+        s"executor $executorId not found," +
+          s" all executors are 
${GlutenDriverEndpoint.executorDataMap.toString}")
+    }
+  }
+
+  def waitAllJobFinish(
+      jobs: ArrayBuffer[(String, CacheJobInfo)],
+      ask: (String, String) => Future[CacheResult]): (Boolean, String) = {
+    val res = collectJobTriggerResult(jobs)
+    var status = res._1
+    val messages = res._2
+    jobs.foreach(
+      job => {
+        if (status) {
+          var complete = false
+          while (!complete) {
+            Thread.sleep(5000)
+            val future_result = ask(job._1, job._2.jobId)
+            val result = ThreadUtils.awaitResult(future_result, Duration.Inf)
+            result.getStatus match {
+              case Status.ERROR =>
+                status = false
+                messages.append(
+                  s"executor : {}, failed with message: {};",
+                  job._1,
+                  result.getMessage)
+                complete = true
+              case Status.SUCCESS =>
+                complete = true
+              case _ =>
+              // still running
+            }
+          }
+        }
+      })
+    (status, messages.mkString(";"))
+  }
+
+  def collectJobTriggerResult(
+      jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean, 
ArrayBuffer[String]) = {
+    var status = true
+    val messages = ArrayBuffer[String]()
+    jobs.foreach(
+      job => {
+        if (!job._2.status) {
+          messages.append(job._2.reason)
+          status = false
+        }
+      })
+    (status, messages)
+  }
+
+  def getResult(
+      futureList: ArrayBuffer[(String, Future[CacheJobInfo])],
+      async: Boolean): Seq[Row] = {
+    val resultList = waitRpcResults(futureList)
+    if (async) {
+      val res = collectJobTriggerResult(resultList)
+      Seq(Row(res._1, res._2.mkString(";")))
+    } else {
+      val fetchStatus: (String, String) => Future[CacheResult] =
+        (executorId: String, jobId: String) => {
+          GlutenDriverEndpoint.executorDataMap
+            .get(toExecutorId(executorId))
+            .executorEndpointRef
+            .ask[CacheResult](GlutenCacheLoadStatus(jobId))
+        }
+      val res = waitAllJobFinish(resultList, fetchStatus)
+      Seq(Row(res._1, res._2))
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
new file mode 100644
index 000000000..0a08df7ce
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.substrait.rel.LocalFilesBuilder
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+
+import org.apache.spark.affinity.CHAffinity
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo, 
GlutenFilesCacheLoad}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.commands.GlutenCacheBase._
+import org.apache.spark.sql.types.{BooleanType, StringType}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+import java.io.FileNotFoundException
+import java.lang.{Long => JLong}
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+
+case class GlutenCacheFilesCommand(
+    async: Boolean,
+    selectedColumn: Option[Seq[String]],
+    filePath: String,
+    propertyOverrides: Map[String, String]
+) extends LeafRunnableCommand {
+
+  override def output: Seq[Attribute] = Seq(
+    AttributeReference("result", BooleanType, nullable = false)(),
+    AttributeReference("reason", StringType, nullable = false)())
+
+  override def run(session: SparkSession): Seq[Row] = {
+    val targetFile = new Path(filePath)
+    val hadoopConf: Configuration = session.sparkContext.hadoopConfiguration
+    val fs = targetFile.getFileSystem(hadoopConf)
+    if (!fs.exists(targetFile)) {
+      throw new FileNotFoundException(filePath)
+    }
+
+    val recursive =
+      if ("true".equalsIgnoreCase(propertyOverrides.getOrElse("recursive", 
"false"))) {
+        true
+      } else {
+        false
+      }
+
+    val files: Seq[FileStatus] = listFiles(targetFile, recursive, fs)
+    val executorIdsToFiles =
+      scala.collection.mutable.Map[String, ArrayBuffer[FileStatus]]()
+    executorIdsToFiles.put(ALL_EXECUTORS, new ArrayBuffer[FileStatus]())
+
+    files.foreach(
+      fileStatus => {
+        val locations = 
CHAffinity.getHostLocations(fileStatus.getPath.toUri.toASCIIString)
+        if (locations.isEmpty) {
+          executorIdsToFiles(ALL_EXECUTORS).append(fileStatus)
+        } else {
+          locations.foreach(
+            executor => {
+              if (!executorIdsToFiles.contains(executor)) {
+                executorIdsToFiles.put(executor, new ArrayBuffer[FileStatus]())
+              }
+              executorIdsToFiles(executor).append(fileStatus)
+            })
+        }
+      })
+
+    val executorIdsToLocalFiles = executorIdsToFiles
+      .filter(_._2.nonEmpty)
+      .map {
+        case (executorId, fileStatusArray) =>
+          val paths = new JArrayList[String]()
+          val starts = new JArrayList[JLong]()
+          val lengths = new JArrayList[JLong]()
+          val partitionColumns = new JArrayList[JMap[String, String]]
+
+          fileStatusArray.foreach(
+            fileStatus => {
+              paths.add(fileStatus.getPath.toUri.toASCIIString)
+              starts.add(JLong.valueOf(0))
+              lengths.add(JLong.valueOf(fileStatus.getLen))
+              partitionColumns.add(new JHashMap[String, String]())
+            })
+
+          val localFile = LocalFilesBuilder.makeLocalFiles(
+            null,
+            paths,
+            starts,
+            lengths,
+            lengths,
+            new JArrayList[JLong](),
+            partitionColumns,
+            new JArrayList[JMap[String, String]](),
+            ReadFileFormat.ParquetReadFormat, // ignore format in backend
+            new JArrayList[String](),
+            new JHashMap[String, String]()
+          )
+
+          (executorId, localFile)
+      }
+      .toMap
+
+    val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
+    val fileNodeOption = executorIdsToLocalFiles.get(ALL_EXECUTORS)
+    if (fileNodeOption.isDefined) {
+      GlutenDriverEndpoint.executorDataMap.forEach(
+        (executorId, executor) => {
+          futureList.append(
+            (
+              executorId,
+              executor.executorEndpointRef.ask[CacheJobInfo](
+                
GlutenFilesCacheLoad(fileNodeOption.get.toProtobuf.toByteArray))))
+        })
+    } else {
+      executorIdsToLocalFiles.foreach {
+        case (executorId, fileNode) =>
+          checkExecutorId(executorId)
+          val executor = 
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(executorId))
+          futureList.append(
+            (
+              executorId,
+              executor.executorEndpointRef.ask[CacheJobInfo](
+                GlutenFilesCacheLoad(fileNode.toProtobuf.toByteArray))))
+      }
+    }
+
+    getResult(futureList, async)
+  }
+
+  private def listFiles(targetFile: Path, recursive: Boolean, fs: FileSystem): 
Seq[FileStatus] = {
+    val dirContents = fs
+      .listStatus(targetFile)
+      .flatMap(f => addInputPathRecursively(fs, f, recursive))
+      .filter(isNonEmptyDataFile)
+      .toSeq
+    dirContents
+  }
+
+  private def addInputPathRecursively(
+      fs: FileSystem,
+      files: FileStatus,
+      recursive: Boolean): Seq[FileStatus] = {
+    if (files.isFile) {
+      Seq(files)
+    } else if (recursive) {
+      fs.listStatus(files.getPath)
+        .flatMap(
+          file => {
+            if (file.isFile) {
+              Seq(file)
+            } else {
+              addInputPathRecursively(fs, file, recursive)
+            }
+          })
+    } else {
+      Seq()
+    }
+  }
+
+  private def isNonEmptyDataFile(f: FileStatus): Boolean = {
+    if (!f.isFile || f.getLen == 0) {
+      false
+    } else {
+      val name = f.getPath.getName
+      !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index f914eaa18..dfc5fbd3b 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -41,6 +41,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends 
WholeStageTransformerSu
     version(0) + "." + version(1)
   }
 
+  val CH_CONFIG_PREFIX: String = 
"spark.gluten.sql.columnar.backend.ch.runtime_config"
+  val CH_SETTING_PREFIX: String = 
"spark.gluten.sql.columnar.backend.ch.runtime_settings"
+
   val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
   val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
   val S3_ENDPOINT = "s3://127.0.0.1:9000/"
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
new file mode 100644
index 000000000..ed8dcbbc1
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution.{CHNativeCacheManager, 
FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+import org.apache.hadoop.fs.Path
+
+class GlutenClickHouseHDFSSuite
+  extends GlutenClickHouseTPCHAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data"
+  override protected val tpchQueries: String =
+    rootPath + "../../../../gluten-core/src/test/resources/tpch-queries"
+  override protected val queriesResults: String = rootPath + "queries-output"
+
+  private val hdfsCachePath = "/tmp/gluten_hdfs_cache/"
+  private val cache_name = "gluten_cache"
+
+  /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", "sort")
+      .set("spark.io.compression.codec", "snappy")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set(s"$CH_CONFIG_PREFIX.use_local_format", "true")
+      .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", 
"sparkMurmurHash3_32")
+      .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.enabled", "true")
+      .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.name", cache_name)
+      .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.path", hdfsCachePath)
+      .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.max_size", "10Gi")
+      .set(s"$CH_CONFIG_PREFIX.reuse_disk_cache", "false")
+      .set("spark.sql.adaptive.enabled", "false")
+  }
+
+  override protected def createTPCHNotNullTables(): Unit = {
+    createNotNullTPCHTablesInParquet(tablesPath)
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    deleteCache()
+  }
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    deleteCache()
+  }
+
+  private def deleteCache(): Unit = {
+    val targetFile = new Path(tablesPath)
+    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+    fs.listStatus(targetFile)
+      .foreach(
+        table => {
+          if (table.isDirectory) {
+            fs.listStatus(table.getPath)
+              .foreach(
+                data => {
+                  if (data.isFile) {
+                    CHNativeCacheManager
+                      .removeFiles(data.getPath.toUri.getPath.substring(1), 
cache_name)
+                  }
+                })
+          }
+        })
+    clearDataPath(hdfsCachePath)
+  }
+
+  val runWithoutCache: () => Unit = () => {
+    runTPCHQuery(6) {
+      df =>
+        val plans = df.queryExecution.executedPlan.collect {
+          case scanExec: FileSourceScanExecTransformer => scanExec
+        }
+        assert(plans.size == 1)
+        assert(plans.head.metrics("readMissBytes").value != 0)
+    }
+  }
+
+  val runWithCache: () => Unit = () => {
+    runTPCHQuery(6) {
+      df =>
+        val plans = df.queryExecution.executedPlan.collect {
+          case scanExec: FileSourceScanExecTransformer => scanExec
+        }
+        assert(plans.size == 1)
+        assert(plans.head.metrics("readMissBytes").value == 0)
+        assert(plans.head.metrics("readCacheBytes").value != 0)
+    }
+  }
+
+  test("test hdfs cache") {
+    runWithoutCache()
+    runWithCache()
+  }
+
+  test("test cache file command") {
+    runSql(
+      s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
+      noFallBack = false) { _ => }
+    runWithCache()
+  }
+
+  test("test no cache by query") {
+    withSQLConf(
+      
s"$CH_SETTING_PREFIX.read_from_filesystem_cache_if_exists_otherwise_bypass_cache"
 -> "true") {
+      runWithoutCache()
+    }
+
+    runWithoutCache()
+    runWithCache()
+  }
+}
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 8e07eea01..9558bf957 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -573,6 +573,18 @@ std::vector<String> 
BackendInitializerUtil::wrapDiskPathConfig(
     std::vector<String> changed_paths;
     if (path_prefix.empty() && path_suffix.empty())
         return changed_paths;
+
+    auto change_func = [&](String key) -> void
+    {
+        if (const String value = config.getString(key, ""); value != "")
+        {
+            const String change_value = path_prefix + value + path_suffix;
+            config.setString(key, change_value);
+            changed_paths.emplace_back(change_value);
+            LOG_INFO(getLogger("BackendInitializerUtil"), "Change config `{}` 
from '{}' to {}.", key, value, change_value);
+        }
+    };
+
     Poco::Util::AbstractConfiguration::Keys disks;
     std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten", 
"cache"};
     config.keys("storage_configuration.disks", disks);
@@ -586,26 +598,14 @@ std::vector<String> 
BackendInitializerUtil::wrapDiskPathConfig(
             if (!disk_types.contains(disk_type))
                 return;
             if (disk_type == "cache")
-            {
-                String path = config.getString(disk_prefix + ".path", "");
-                if (!path.empty())
-                {
-                    String final_path = path_prefix + path + path_suffix;
-                    config.setString(disk_prefix + ".path", final_path);
-                    changed_paths.emplace_back(final_path);
-                }
-            }
+                change_func(disk_prefix + ".path");
             else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
-            {
-                String metadata_path = config.getString(disk_prefix + 
".metadata_path", "");
-                if (!metadata_path.empty())
-                {
-                    String final_path = path_prefix + metadata_path + 
path_suffix;
-                    config.setString(disk_prefix + ".metadata_path", 
final_path);
-                    changed_paths.emplace_back(final_path);
-                }
-            }
+                change_func(disk_prefix + ".metadata_path");
         });
+
+    change_func("path");
+    change_func("gluten_cache.local.path");
+
     return changed_paths;
 }
 
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index c91b7264d..a92155d14 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -195,6 +195,8 @@ public:
 
     inline static const String GLUTEN_TASK_OFFHEAP = 
"spark.gluten.memory.task.offHeap.size.in.bytes";
 
+    inline static const String GLUTEN_LOCAL_CACHE_PREFIX = 
"gluten_cache.local.";
+
     /// On yarn mode, native writing on hdfs cluster takes yarn container user 
as the user passed to libhdfs3, which
     /// will cause permission issue because yarn container user is not the 
owner of the hdfs dir to be written.
     /// So we need to get the spark user from env and pass it to libhdfs3.
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h 
b/cpp-ch/local-engine/Common/GlutenConfig.h
index d0e2e9dee..38c4ce162 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -20,6 +20,7 @@
 #include <Interpreters/Context.h>
 #include <base/types.h>
 #include <base/unit.h>
+#include <Common/logger_useful.h>
 
 namespace local_engine
 {
@@ -134,13 +135,17 @@ struct HdfsConfig
 {
     inline static const String HDFS_ASYNC = "hdfs.enable_async_io";
 
-    bool hdfs_async = true;
+    bool hdfs_async;
 
-    static HdfsConfig loadFromContext(DB::ContextPtr context)
+    static HdfsConfig loadFromContext(const Poco::Util::AbstractConfiguration 
& config, const DB::ReadSettings & read_settings)
     {
-        HdfsConfig config;
-        config.hdfs_async = context->getConfigRef().getBool(HDFS_ASYNC, true);
-        return config;
+        HdfsConfig hdfs;
+        if (read_settings.enable_filesystem_cache)
+            hdfs.hdfs_async = false;
+        else
+            hdfs.hdfs_async = config.getBool(HDFS_ASYNC, true);
+
+        return hdfs;
     }
 };
 
@@ -159,10 +164,17 @@ struct S3Config
     static S3Config loadFromContext(DB::ContextPtr context)
     {
         S3Config config;
-        config.s3_local_cache_enabled = 
context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false);
-        config.s3_local_cache_max_size = 
context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB);
-        config.s3_local_cache_cache_path = 
context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, "");
-        config.s3_gcs_issue_compose_request = 
context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false);
+
+        if (context->getConfigRef().has("S3_LOCAL_CACHE_ENABLE"))
+        {
+            LOG_WARNING(&Poco::Logger::get("S3Config"), "Config {} has 
deprecated.", S3_LOCAL_CACHE_ENABLE);
+
+            config.s3_local_cache_enabled = 
context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false);
+            config.s3_local_cache_max_size = 
context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB);
+            config.s3_local_cache_cache_path = 
context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, "");
+            config.s3_gcs_issue_compose_request = 
context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false);
+        }
+
         return config;
     }
 };
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp 
b/cpp-ch/local-engine/Common/QueryContext.cpp
index e5f5dd5dc..ff9c15115 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -77,14 +77,19 @@ int64_t QueryContextManager::initializeQuery()
 
 DB::ContextMutablePtr QueryContextManager::currentQueryContext()
 {
-    if (!CurrentThread::getGroup())
-    {
-        throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not 
found.");
-    }
+    auto thread_group = currentThreadGroup();
     int64_t id = reinterpret_cast<int64_t>(CurrentThread::getGroup().get());
     return query_map.get(id)->query_context;
 }
 
+std::shared_ptr<DB::ThreadGroup> QueryContextManager::currentThreadGroup()
+{
+    if (auto thread_group = CurrentThread::getGroup())
+        return thread_group;
+
+    throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
+}
+
 void 
QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters & 
counters)
 {
     if (!CurrentThread::getGroup())
diff --git a/cpp-ch/local-engine/Common/QueryContext.h 
b/cpp-ch/local-engine/Common/QueryContext.h
index 0fbf49773..4770327d1 100644
--- a/cpp-ch/local-engine/Common/QueryContext.h
+++ b/cpp-ch/local-engine/Common/QueryContext.h
@@ -30,6 +30,7 @@ public:
     }
     int64_t initializeQuery();
     DB::ContextMutablePtr currentQueryContext();
+    static std::shared_ptr<DB::ThreadGroup> currentThreadGroup();
     void logCurrentPerformanceCounters(ProfileEvents::Counters& counters);
     size_t currentPeakMemory(int64_t id);
     void finalizeQuery(int64_t id);
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp 
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 7b8b4cfd9..e13864260 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -15,15 +15,63 @@
  * limitations under the License.
  */
 #include "RelMetric.h"
+
 #include <Processors/IProcessor.h>
 #include <Processors/QueryPlan/AggregatingStep.h>
 #include <Processors/QueryPlan/ReadFromMergeTree.h>
+#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
+#include <Common/QueryContext.h>
 
 using namespace rapidjson;
 
+namespace ProfileEvents
+{
+extern const Event FileSegmentWaitReadBufferMicroseconds;
+extern const Event FileSegmentReadMicroseconds;
+extern const Event FileSegmentCacheWriteMicroseconds;
+extern const Event FileSegmentPredownloadMicroseconds;
+extern const Event FileSegmentUsedBytes;
+
+extern const Event CachedReadBufferReadFromSourceMicroseconds;
+extern const Event CachedReadBufferReadFromCacheMicroseconds;
+extern const Event CachedReadBufferCacheWriteMicroseconds;
+extern const Event CachedReadBufferReadFromSourceBytes;
+extern const Event CachedReadBufferReadFromCacheBytes;
+extern const Event CachedReadBufferCacheWriteBytes;
+extern const Event CachedReadBufferCreateBufferMicroseconds;
+
+extern const Event CachedReadBufferReadFromCacheHits;
+extern const Event CachedReadBufferReadFromCacheMisses;
+}
+
 namespace local_engine
 {
 
+static void writeCacheHits(Writer<StringBuffer> & writer)
+{
+    const auto thread_group = QueryContextManager::currentThreadGroup();
+    auto & counters = thread_group->performance_counters;
+    auto read_cache_hits = 
counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load();
+    auto miss_cache_hits = 
counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load();
+    auto read_cache_bytes = 
counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load();
+    auto read_miss_bytes = 
counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load();
+    auto read_cache_millisecond = 
counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() / 
1000;
+    auto miss_cache_millisecond = 
counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() / 
1000;
+
+    writer.Key("read_cache_hits");
+    writer.Uint64(read_cache_hits);
+    writer.Key("miss_cache_hits");
+    writer.Uint64(miss_cache_hits);
+    writer.Key("read_cache_bytes");
+    writer.Uint64(read_cache_bytes);
+    writer.Key("read_miss_bytes");
+    writer.Uint64(read_miss_bytes);
+    writer.Key("read_cache_millisecond");
+    writer.Uint64(read_cache_millisecond);
+    writer.Key("miss_cache_millisecond");
+    writer.Uint64(miss_cache_millisecond);
+}
+
 RelMetric::RelMetric(size_t id_, const String & name_, 
std::vector<DB::IQueryPlanStep *> & steps_) : id(id_), name(name_), 
steps(steps_)
 {
 }
@@ -117,7 +165,7 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, 
bool) const
             }
             writer.EndArray();
 
-            if (auto read_mergetree = 
dynamic_cast<DB::ReadFromMergeTree*>(step))
+            if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree 
*>(step))
             {
                 auto selected_marks_pk = 
read_mergetree->getAnalysisResult().selected_marks_pk;
                 auto selected_marks = 
read_mergetree->getAnalysisResult().selected_marks;
@@ -128,6 +176,11 @@ void RelMetric::serialize(Writer<StringBuffer> & writer, 
bool) const
                 writer.Uint64(selected_marks);
                 writer.Key("total_marks_pk");
                 writer.Uint64(total_marks_pk);
+                writeCacheHits(writer);
+            }
+            else if (dynamic_cast<SubstraitFileSourceStep *>(step))
+            {
+                writeCacheHits(writer);
             }
 
             writer.EndObject();
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index a97f0c72a..0dc852a90 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -16,20 +16,21 @@
  */
 #include "CacheManager.h"
 
+#include <ranges>
 #include <Core/Settings.h>
 #include <Disks/IStoragePolicy.h>
 #include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
-#include <Interpreters/Context.h>
+#include <Interpreters/Cache/FileCache.h>
 #include <Interpreters/Cache/FileCacheFactory.h>
-#include <Storages/Mergetree/MetaDataHelper.h>
-#include <Common/ThreadPool.h>
+#include <Interpreters/Context.h>
 #include <Parser/MergeTreeRelParser.h>
 #include <Processors/Executors/PipelineExecutor.h>
 #include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
 #include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
 #include <Common/Logger.h>
+#include <Common/ThreadPool.h>
 #include <Common/logger_useful.h>
-#include <ranges>
 
 #include <jni/jni_common.h>
 
@@ -178,4 +179,62 @@ jobject CacheManager::getCacheStatus(JNIEnv * env, const 
String & jobId)
     }
     return env->NewObject(cache_result_class, cache_result_constructor, 
status, charTojstring(env, message.c_str()));
 }
+
+Task CacheManager::cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles 
& file, ReadBufferBuilderPtr read_buffer_builder)
+{
+    auto task = [file, read_buffer_builder, context = this->context]()
+    {
+        LOG_INFO(getLogger("CacheManager"), "Loading cache file {}", 
file.uri_file());
+
+        try
+        {
+            std::unique_ptr<DB::ReadBuffer> rb = 
read_buffer_builder->build(file);
+            while (!rb->eof())
+                rb->ignoreAll();
+        }
+        catch (std::exception & e)
+        {
+            LOG_ERROR(getLogger("CacheManager"), "Load cache file {} failed.\n 
{}", file.uri_file(), e.what());
+            std::rethrow_exception(std::current_exception());
+        }
+    };
+
+    return std::move(task);
+}
+
+JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
+{
+    JobId id = toString(UUIDHelpers::generateV4());
+    Job job(id);
+
+    if (file_infos.items_size())
+    {
+        const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
+        const auto read_buffer_builder = 
ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), 
context);
+
+        if (read_buffer_builder->file_cache)
+            for (const auto & file : file_infos.items())
+                job.addTask(cacheFile(file, read_buffer_builder));
+        else
+            LOG_WARNING(getLogger("CacheManager"), "Load cache skipped because 
cache not enabled.");
+    }
+
+    auto & scheduler = JobScheduler::instance();
+    scheduler.scheduleJob(std::move(job));
+    return id;
+}
+
+void CacheManager::removeFiles(String file, String cache_name)
+{
+    // only for ut
+    for (const auto & [name, file_cache] : 
FileCacheFactory::instance().getAll())
+    {
+        if (name != cache_name)
+            continue;
+
+        if (const auto cache = file_cache->cache)
+            cache->removePathIfExists(file, 
DB::FileCache::getCommonUser().user_id);
+    }
+}
+
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index b88a3ea03..6335f86bb 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -15,9 +15,13 @@
  * limitations under the License.
  */
 #pragma once
+
+#include <substrait/plan.pb.h>
+
 #include <Disks/IDisk.h>
 #include <Storages/Cache/JobScheduler.h>
 #include <jni.h>
+#include <Storages/SubstraitSource/ReadBufferBuilder.h>
 
 namespace local_engine
 {
@@ -40,6 +44,10 @@ public:
     Task cachePart(const MergeTreeTable& table, const MergeTreePart& part, 
const std::unordered_set<String>& columns);
     JobId cacheParts(const String& table_def, const 
std::unordered_set<String>& columns);
     static jobject getCacheStatus(JNIEnv * env, const String& jobId);
+
+    Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, 
ReadBufferBuilderPtr read_buffer_builder);
+    JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos);
+    static void removeFiles(String file, String cache_name);
 private:
     CacheManager() = default;
     DB::ContextMutablePtr context;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index da1589007..b32073db5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -35,7 +35,7 @@
 #include <Interpreters/Cache/FileCache.h>
 #include <Interpreters/Cache/FileCacheFactory.h>
 #include <Interpreters/Cache/FileCacheSettings.h>
-#include <Interpreters/Context_fwd.h>
+#include <Interpreters/Context.h>
 #include <Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h>
 #include <Storages/ObjectStorage/HDFS/HDFSCommon.h>
 #include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
@@ -49,7 +49,6 @@
 #include <Common/CHUtil.h>
 #include <Common/FileCacheConcurrentMap.h>
 #include <Common/GlutenConfig.h>
-#include <Common/Throttler.h>
 #include <Common/logger_useful.h>
 #include <Common/safe_cast.h>
 
@@ -77,8 +76,6 @@ namespace ErrorCodes
 }
 }
 
-namespace fs = std::filesystem;
-
 namespace local_engine
 {
 template <class key_type, class value_type>
@@ -205,6 +202,7 @@ public:
 #if USE_HDFS
 class HDFSFileReadBufferBuilder : public ReadBufferBuilder
 {
+    using ReadBufferCreator = 
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek, 
const DB::StoredObject & object)>;
 public:
     explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) : 
ReadBufferBuilder(context_), context(context_) { }
     ~HDFSFileReadBufferBuilder() override = default;
@@ -212,18 +210,21 @@ public:
     std::unique_ptr<DB::ReadBuffer>
     build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool 
set_read_util_position) override
     {
-        auto config = HdfsConfig::loadFromContext(context);
+        DB::ReadSettings read_settings = getReadSettings(context);
+        auto & config = context->getConfigRef();
+        auto hdfs_config = HdfsConfig::loadFromContext(config, read_settings);
         Poco::URI file_uri(file_info.uri_file());
         std::string uri_path = "hdfs://" + file_uri.getHost();
         if (file_uri.getPort())
-            uri_path += ":" + std::to_string(file_uri.getPort());
+            uri_path += ":" + 
std::to_string(static_cast<unsigned>(file_uri.getPort()));
 
-        DB::ReadSettings read_settings;
-        std::unique_ptr<DB::ReadBuffer> read_buffer;
+        size_t read_util_position = 0;
+        size_t read_begin = 0;
         if (set_read_util_position)
         {
             std::pair<size_t, size_t> start_end_pos
-                = adjustFileReadStartAndEndPos(file_info.start(), 
file_info.start() + file_info.length(), uri_path, file_uri.getPath());
+               = adjustFileReadStartAndEndPos(file_info.start(), 
file_info.start() + file_info.length(), uri_path, file_uri.getPath());
+
             LOG_DEBUG(
                 &Poco::Logger::get("ReadBufferBuilder"),
                 "File read start and end position adjusted from {},{} to 
{},{}",
@@ -232,34 +233,57 @@ public:
                 start_end_pos.first,
                 start_end_pos.second);
 
-            auto read_buffer_impl = std::make_unique<DB::ReadBufferFromHDFS>(
-                uri_path, file_uri.getPath(), context->getConfigRef(), 
read_settings, start_end_pos.second, true);
-            if (config.hdfs_async)
-            {
-                auto & pool_reader = 
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
-                read_buffer = 
std::make_unique<DB::AsynchronousReadBufferFromHDFS>(pool_reader, 
read_settings, std::move(read_buffer_impl));
-            }
-            else
-                read_buffer = std::move(read_buffer_impl);
+            read_begin = start_end_pos.first;
+            read_util_position = start_end_pos.second;
+        }
 
-            if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer 
*>(read_buffer.get()))
-                if (start_end_pos.first)
-                    seekable_in->seek(start_end_pos.first, SEEK_SET);
+        size_t file_size = 0;
+        if (file_info.has_properties())
+            file_size = file_info.properties().filesize();
+
+        std::unique_ptr<DB::ReadBuffer> read_buffer;
+
+        if (hdfs_config.hdfs_async)
+        {
+            std::optional<size_t> size = std::nullopt;
+            if (file_size)
+                size = file_size;
+
+            auto read_buffer_impl = std::make_shared<DB::ReadBufferFromHDFS>(
+                            uri_path, file_uri.getPath(), config, 
read_settings, read_util_position, true, size);
+            auto & pool_reader = 
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
+            read_buffer = 
std::make_unique<DB::AsynchronousReadBufferFromHDFS>(pool_reader, 
read_settings, std::move(read_buffer_impl));
         }
         else
         {
-            auto read_buffer_impl
-                = std::make_unique<DB::ReadBufferFromHDFS>(uri_path, 
file_uri.getPath(), context->getConfigRef(), read_settings, 0, true);
-            if (config.hdfs_async)
+            if (!file_size)
             {
-                read_buffer = 
std::make_unique<DB::AsynchronousReadBufferFromHDFS>(
-                    
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER),
-                    read_settings,
-                    std::move(read_buffer_impl));
+                // only for spark3.2 file partition not contained file size
+                // so first compute file size first
+                auto read_buffer_impl = 
std::make_unique<DB::ReadBufferFromHDFS>(
+                    uri_path, file_uri.getPath(), config, read_settings, 
read_util_position, true);
+                file_size = read_buffer_impl->getFileSize();
             }
-            else
-                read_buffer = std::move(read_buffer_impl);
+
+            ReadBufferCreator hdfs_read_buffer_creator
+                = [this, hdfs_uri = uri_path, hdfs_file_path = 
file_uri.getPath(), read_settings, &config, read_util_position](
+                      bool /* restricted_seek */, const DB::StoredObject & 
object) -> std::unique_ptr<DB::ReadBufferFromHDFS>
+            {
+                return std::make_unique<DB::ReadBufferFromHDFS>(
+                    hdfs_uri, hdfs_file_path, config, read_settings, 
read_util_position, true, object.bytes_size);
+            };
+
+            DB::StoredObjects 
stored_objects{DB::StoredObject{file_uri.getPath().substr(1), "", file_size}};
+            auto cache_hdfs_read = 
std::make_unique<DB::ReadBufferFromRemoteFSGather>(
+                std::move(hdfs_read_buffer_creator), stored_objects, "hdfs:", 
read_settings, nullptr, /* use_external_buffer */ false);
+            cache_hdfs_read->setReadUntilPosition(read_util_position);
+            read_buffer = std::move(cache_hdfs_read);
         }
+
+        if (set_read_util_position && read_begin)
+            if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer 
*>(read_buffer.get()))
+                seekable_in->seek(read_begin, SEEK_SET);
+
         return read_buffer;
     }
 
@@ -367,6 +391,7 @@ public:
         result.second = get_next_line_pos(fs.get(), fin, read_end_pos, 
hdfs_file_size);
         return result;
     }
+
 private:
     DB::ContextPtr context;
 };
@@ -382,23 +407,19 @@ public:
     explicit S3FileReadBufferBuilder(DB::ContextPtr context_) : 
ReadBufferBuilder(context_)
     {
         auto config = S3Config::loadFromContext(context);
-        new_settings = context->getReadSettings();
-        new_settings.enable_filesystem_cache = config.s3_local_cache_enabled;
-
-        if (new_settings.enable_filesystem_cache)
+        // use gluten cache config is first priority
+        if (!file_cache && config.s3_local_cache_enabled)
         {
             DB::FileCacheSettings file_cache_settings;
             file_cache_settings.max_size = config.s3_local_cache_max_size;
             auto cache_base_path = config.s3_local_cache_cache_path;
 
-            if (!fs::exists(cache_base_path))
-                fs::create_directories(cache_base_path);
+            if (!std::filesystem::exists(cache_base_path))
+                std::filesystem::create_directories(cache_base_path);
 
             file_cache_settings.base_path = cache_base_path;
             file_cache = 
DB::FileCacheFactory::instance().getOrCreate("s3_local_cache", 
file_cache_settings, "");
             file_cache->initialize();
-
-            new_settings.remote_fs_cache = file_cache;
         }
     }
 
@@ -407,6 +428,7 @@ public:
     std::unique_ptr<DB::ReadBuffer>
     build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool 
set_read_util_position) override
     {
+        DB::ReadSettings read_settings = getReadSettings(context);
         Poco::URI file_uri(file_info.uri_file());
         // file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
         const std::string& bucket = file_uri.getHost();
@@ -416,9 +438,8 @@ public:
         size_t object_size = object_info.size;
         Int64 object_modified_time = object_info.last_modification_time;
 
-        if (new_settings.enable_filesystem_cache)
+        if (read_settings.enable_filesystem_cache)
         {
-
             auto file_cache_key = DB::FileCacheKey(key);
             auto last_cache_time = files_cache_time_map.get(file_cache_key);
             // quick check
@@ -436,7 +457,7 @@ public:
         }
 
         auto read_buffer_creator
-            = [bucket, client, this](bool restricted_seek, const 
DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
+            = [bucket, client, read_settings, this](bool restricted_seek, 
const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
         {
             return std::make_unique<DB::ReadBufferFromS3>(
                 client,
@@ -444,7 +465,7 @@ public:
                 object.remote_path,
                 "",
                 DB::S3::RequestSettings(),
-                new_settings,
+                read_settings,
                 /* use_external_buffer */ true,
                 /* offset */ 0,
                 /* read_until_position */0,
@@ -453,11 +474,11 @@ public:
 
         DB::StoredObjects stored_objects{DB::StoredObject{key, "", 
object_size}};
         auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
-            std::move(read_buffer_creator), stored_objects, "s3:" + bucket + 
"/", new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
+            std::move(read_buffer_creator), stored_objects, "s3:" + bucket + 
"/", read_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
 
         auto & pool_reader = 
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
         auto async_reader
-            = 
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), 
pool_reader, new_settings, nullptr, nullptr);
+            = 
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl), 
pool_reader, read_settings, nullptr, nullptr);
 
         if (set_read_util_position)
         {
@@ -478,7 +499,7 @@ public:
             async_reader->setReadUntilEnd();
         }
 
-        if (new_settings.remote_fs_prefetch)
+        if (read_settings.remote_fs_prefetch)
             async_reader->prefetch(Priority{});
 
         return async_reader;
@@ -488,7 +509,6 @@ private:
     static const std::string SHARED_CLIENT_KEY;
     static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>> 
per_bucket_clients;
     static FileCacheConcurrentMap files_cache_time_map;
-    DB::ReadSettings new_settings;
     DB::FileCachePtr file_cache;
 
     std::string & stripQuote(std::string & s)
@@ -732,6 +752,57 @@ void registerReadBufferBuilders()
 #endif
 }
 
+ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr context_) : 
context(context_)
+{
+    const auto & config = context->getConfigRef();
+    if (config.getBool("gluten_cache.local.enabled", false))
+    {
+        DB::FileCacheSettings file_cache_settings;
+
+        file_cache_settings.loadFromConfig(config, "gluten_cache.local");
+
+        if (std::filesystem::path(file_cache_settings.base_path).is_relative())
+            file_cache_settings.base_path = 
std::filesystem::path(context->getPath()) / "caches" / 
file_cache_settings.base_path;
+
+        if (!std::filesystem::exists(file_cache_settings.base_path))
+            std::filesystem::create_directories(file_cache_settings.base_path);
+
+        auto name = config.getString("gluten_cache.local.name");
+        auto * config_prefix = "";
+        file_cache = DB::FileCacheFactory::instance().getOrCreate(name, 
file_cache_settings, config_prefix);
+        file_cache->initialize();
+    }
+}
+
+DB::ReadSettings ReadBufferBuilder::getReadSettings(DB::ContextPtr context) 
const
+{
+    DB::ReadSettings read_settings = context->getReadSettings();
+    if (file_cache)
+    {
+        read_settings.enable_filesystem_cache = true;
+        read_settings.remote_fs_cache = file_cache;
+    }
+    else
+    {
+        read_settings.enable_filesystem_cache = false;
+    }
+
+    return read_settings;
+}
+
+
+std::unique_ptr<DB::ReadBuffer>
+ReadBufferBuilder::buildWithCompressionWrapper(const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool 
set_read_util_position)
+{
+    auto in = build(file_info, set_read_util_position);
+
+    /// Wrap the read buffer with compression method if exists
+    Poco::URI file_uri(file_info.uri_file());
+    DB::CompressionMethod compression = 
DB::chooseCompressionMethod(file_uri.getPath(), "auto");
+    return compression != DB::CompressionMethod::None ? 
DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) : 
std::move(in);
+}
+
+
 ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
 {
     static ReadBufferBuilderFactory instance;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
index f5218f0aa..92d8d41c1 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
@@ -15,21 +15,21 @@
  * limitations under the License.
  */
 #pragma once
+
 #include <functional>
 #include <memory>
 #include <IO/ReadBuffer.h>
-#include <Interpreters/Context.h>
-#include <Interpreters/Context_fwd.h>
-#include <boost/core/noncopyable.hpp>
 #include <substrait/plan.pb.h>
-#include <Poco/URI.h>
+
 
 namespace local_engine
 {
+
 class ReadBufferBuilder
 {
 public:
-    explicit ReadBufferBuilder(DB::ContextPtr context_) : context(context_) { }
+    explicit ReadBufferBuilder(DB::ContextPtr context_);
+
     virtual ~ReadBufferBuilder() = default;
 
     /// build a new read buffer
@@ -37,19 +37,14 @@ public:
     build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool 
set_read_util_position = false) = 0;
 
     /// build a new read buffer, consider compression method
-    std::unique_ptr<DB::ReadBuffer> buildWithCompressionWrapper(const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool 
set_read_util_position = false)
-    {
-        auto in = build(file_info, set_read_util_position);
-
-        /// Wrap the read buffer with compression method if exists
-        Poco::URI file_uri(file_info.uri_file());
-        DB::CompressionMethod compression = 
DB::chooseCompressionMethod(file_uri.getPath(), "auto");
-        return compression != DB::CompressionMethod::None ? 
DB::wrapReadBufferWithCompressionMethod(std::move(in), compression)
-                                                          : std::move(in);
-    }
+    std::unique_ptr<DB::ReadBuffer> buildWithCompressionWrapper(const 
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool 
set_read_util_position = false);
 
 protected:
+    DB::ReadSettings getReadSettings(DB::ContextPtr context) const;
     DB::ContextPtr context;
+
+public:
+    DB::FileCachePtr file_cache = nullptr;
 };
 
 using ReadBufferBuilderPtr = std::shared_ptr<ReadBufferBuilder>;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 3c3d6d4f8..f27da2f92 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1292,6 +1292,30 @@ JNIEXPORT jobject 
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeGe
     return local_engine::CacheManager::instance().getCacheStatus(env, 
jstring2string(env, id));
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
 }
+
+JNIEXPORT jstring 
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheFiles(JNIEnv * 
env, jobject, jbyteArray files)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    const auto files_bytes = local_engine::getByteArrayElementsSafe(env, 
files);
+    const std::string::size_type files_bytes_size = files_bytes.length();
+    std::string_view files_view = {reinterpret_cast<const char 
*>(files_bytes.elems()), files_bytes_size};
+    substrait::ReadRel::LocalFiles local_files = 
local_engine::BinaryToMessage<substrait::ReadRel::LocalFiles>(files_view);
+
+    auto jobId = 
local_engine::CacheManager::instance().cacheFiles(local_files);
+    return local_engine::charTojstring(env, jobId.c_str());
+    LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
+}
+
+JNIEXPORT void 
Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env, 
jobject, jstring file_, jstring cache_name_)
+{
+    LOCAL_ENGINE_JNI_METHOD_START
+    auto file = jstring2string(env, file_);
+    auto cache_name = jstring2string(env, cache_name_);
+
+    local_engine::CacheManager::removeFiles(file, cache_name);
+    LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
 #ifdef __cplusplus
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to