This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 371d448c3 [GLUTEN-6840][CH] Enable cache files for hdfs (#6841)
371d448c3 is described below
commit 371d448c3b512b1cbb3272e932cb7a57b9da4ba6
Author: Shuai li <[email protected]>
AuthorDate: Wed Aug 21 14:40:54 2024 +0800
[GLUTEN-6840][CH] Enable cache files for hdfs (#6841)
---
.../gluten/sql/parser/GlutenCacheFileSqlBase.g4 | 224 +++++++++++++++++++++
.../gluten/parser/GlutenCacheFilesSqlParser.scala | 61 ++++++
.../gluten/parser/GlutenCacheFilesSqlParser.scala | 65 ++++++
.../gluten/parser/GlutenCacheFilesSqlParser.scala | 65 ++++++
.../gluten/execution/CHNativeCacheManager.java | 5 +
.../org/apache/gluten/execution/CacheResult.java | 4 +-
.../org/apache/gluten/metrics/MetricsStep.java | 66 ++++++
.../backendsapi/clickhouse/CHIteratorApi.scala | 17 +-
.../backendsapi/clickhouse/CHMetricsApi.scala | 20 +-
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 4 +-
.../metrics/FileSourceScanMetricsUpdater.scala | 24 ++-
...se.scala => GlutenCacheFileSqlParserBase.scala} | 132 +++++-------
.../parser/GlutenClickhouseSqlParserBase.scala | 20 +-
.../apache/gluten/parser/UpperCaseCharStream.scala | 38 ++++
.../apache/spark/rpc/GlutenExecutorEndpoint.scala | 13 +-
.../org/apache/spark/rpc/GlutenRpcMessages.scala | 6 +-
.../commands/GlutenCHCacheDataCommand.scala | 100 +--------
.../sql/execution/commands/GlutenCacheBase.scala | 123 +++++++++++
.../commands/GlutenCacheFilesCommand.scala | 188 +++++++++++++++++
...lutenClickHouseWholeStageTransformerSuite.scala | 3 +
.../execution/tpch/GlutenClickHouseHDFSSuite.scala | 134 ++++++++++++
cpp-ch/local-engine/Common/CHUtil.cpp | 36 ++--
cpp-ch/local-engine/Common/CHUtil.h | 2 +
cpp-ch/local-engine/Common/GlutenConfig.h | 30 ++-
cpp-ch/local-engine/Common/QueryContext.cpp | 13 +-
cpp-ch/local-engine/Common/QueryContext.h | 1 +
cpp-ch/local-engine/Parser/RelMetric.cpp | 55 ++++-
.../local-engine/Storages/Cache/CacheManager.cpp | 67 +++++-
cpp-ch/local-engine/Storages/Cache/CacheManager.h | 8 +
.../Storages/SubstraitSource/ReadBufferBuilder.cpp | 163 ++++++++++-----
.../Storages/SubstraitSource/ReadBufferBuilder.h | 25 +--
cpp-ch/local-engine/local_engine_jni.cpp | 24 +++
32 files changed, 1427 insertions(+), 309 deletions(-)
diff --git
a/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
new file mode 100644
index 000000000..abdb0cdbf
--- /dev/null
+++
b/backends-clickhouse/src/main/antlr4/org/apache/gluten/sql/parser/GlutenCacheFileSqlBase.g4
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+grammar GlutenCacheFileSqlBase;
+
+@members {
+ /**
+ * Verify whether current token is a valid decimal token (which contains
dot).
+ * Returns true if the character that follows the token is not a digit or
letter or underscore.
+ *
+ * For example:
+ * For char stream "2.3", "2." is not a valid decimal token, because it is
followed by digit '3'.
+ * For char stream "2.3_", "2.3" is not a valid decimal token, because it is
followed by '_'.
+ * For char stream "2.3W", "2.3" is not a valid decimal token, because it is
followed by 'W'.
+ * For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token
because it is folllowed
+ * by a space. 34.E2 is a valid decimal token because it is followed by
symbol '+'
+ * which is not a digit or letter or underscore.
+ */
+ public boolean isValidDecimal() {
+ int nextChar = _input.LA(1);
+ if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <=
'9' ||
+ nextChar == '_') {
+ return false;
+ } else {
+ return true;
+ }
+ }
+}
+
+tokens {
+ DELIMITER
+}
+
+singleStatement
+ : statement ';'* EOF
+ ;
+
+statement
+ : CACHE FILES ASYNC? SELECT selectedColumns=selectedColumnNames
+ FROM (path=STRING)
+ (CACHEPROPERTIES cacheProps=propertyList)?
#cacheFiles
+ | .*?
#passThrough
+ ;
+
+selectedColumnNames
+ : ASTERISK
+ | identifier (COMMA identifier)*
+ ;
+
+propertyList
+ : LEFT_PAREN property (COMMA property)* RIGHT_PAREN
+ ;
+
+property
+ : key=propertyKey (EQ? value=propertyValue)?
+ ;
+
+propertyKey
+ : identifier (DOT identifier)*
+ | stringLit
+ ;
+
+propertyValue
+ : INTEGER_VALUE
+ | DECIMAL_VALUE
+ | booleanValue
+ | identifier LEFT_PAREN stringLit COMMA stringLit RIGHT_PAREN
+ | value=stringLit
+ ;
+
+stringLit
+ : STRING
+ | DOUBLEQUOTED_STRING
+ ;
+
+booleanValue
+ : TRUE | FALSE
+ ;
+
+identifier
+ : IDENTIFIER #unquotedIdentifier
+ | quotedIdentifier #quotedIdentifierAlternative
+ | nonReserved #unquotedIdentifier
+ ;
+
+quotedIdentifier
+ : BACKQUOTED_IDENTIFIER
+ ;
+
+// Add keywords here so that people's queries don't break if they have a
column name as one of
+// these tokens
+nonReserved
+ : CACHE | FILES | ASYNC
+ | SELECT | FOR | AFTER | CACHEPROPERTIES
+ | TIMESTAMP | AS | OF | DATE_PARTITION
+ |
+ ;
+
+// Define how the keywords above should appear in a user's SQL statement.
+CACHE: 'CACHE';
+META: 'META';
+ASYNC: 'ASYNC';
+SELECT: 'SELECT';
+COMMA: ',';
+FOR: 'FOR';
+FROM: 'FROM';
+AFTER: 'AFTER';
+CACHEPROPERTIES: 'CACHEPROPERTIES';
+DOT: '.';
+ASTERISK: '*';
+TIMESTAMP: 'TIMESTAMP';
+AS: 'AS';
+OF: 'OF';
+DATE_PARTITION: 'DATE_PARTITION';
+LEFT_PAREN: '(';
+RIGHT_PAREN: ')';
+TRUE: 'TRUE';
+FALSE: 'FALSE';
+FILES: 'FILES';
+
+EQ : '=' | '==';
+NSEQ: '<=>';
+NEQ : '<>';
+NEQJ: '!=';
+LTE : '<=' | '!>';
+GTE : '>=' | '!<';
+CONCAT_PIPE: '||';
+
+STRING
+ : '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
+ | '"' ( ~('"'|'\\') | ('\\' .) )* '"'
+ ;
+
+DOUBLEQUOTED_STRING
+ :'"' ( ~('"'|'\\') | ('\\' .) )* '"'
+ ;
+
+BIGINT_LITERAL
+ : DIGIT+ 'L'
+ ;
+
+SMALLINT_LITERAL
+ : DIGIT+ 'S'
+ ;
+
+TINYINT_LITERAL
+ : DIGIT+ 'Y'
+ ;
+
+INTEGER_VALUE
+ : DIGIT+
+ ;
+
+DECIMAL_VALUE
+ : DIGIT+ EXPONENT
+ | DECIMAL_DIGITS EXPONENT? {isValidDecimal()}?
+ ;
+
+DOUBLE_LITERAL
+ : DIGIT+ EXPONENT? 'D'
+ | DECIMAL_DIGITS EXPONENT? 'D' {isValidDecimal()}?
+ ;
+
+BIGDECIMAL_LITERAL
+ : DIGIT+ EXPONENT? 'BD'
+ | DECIMAL_DIGITS EXPONENT? 'BD' {isValidDecimal()}?
+ ;
+
+IDENTIFIER
+ : (LETTER | DIGIT | '_')+
+ ;
+
+BACKQUOTED_IDENTIFIER
+ : '`' ( ~'`' | '``' )* '`'
+ ;
+
+fragment DECIMAL_DIGITS
+ : DIGIT+ '.' DIGIT*
+ | '.' DIGIT+
+ ;
+
+fragment EXPONENT
+ : 'E' [+-]? DIGIT+
+ ;
+
+fragment DIGIT
+ : [0-9]
+ ;
+
+fragment LETTER
+ : [A-Z]
+ ;
+
+SIMPLE_COMMENT
+ : '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
+ ;
+
+BRACKETED_COMMENT
+ : '/*' .*? '*/' -> channel(HIDDEN)
+ ;
+
+WS : [ \r\n\t]+ -> channel(HIDDEN)
+ ;
+
+// Catch-all for anything we can't recognize.
+// We use this to be able to ignore and recover all the text
+// when splitting statements with DelimiterLexer
+UNRECOGNIZED
+ : .
+ ;
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 000000000..48d26498f
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-20/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenCacheFileSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+}
diff --git
a/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 000000000..9a0cde772
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-23/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenCacheFileSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+
+ override def parseQuery(sqlText: String): LogicalPlan = {
+ delegate.parseQuery(sqlText)
+ }
+}
diff --git
a/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
new file mode 100644
index 000000000..9a0cde772
--- /dev/null
+++
b/backends-clickhouse/src/main/delta-32/org/apache/gluten/parser/GlutenCacheFilesSqlParser.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types.{DataType, StructType}
+
+class GlutenCacheFilesSqlParser(spark: SparkSession, delegate: ParserInterface)
+ extends GlutenCacheFileSqlParserBase {
+
+ override def parsePlan(sqlText: String): LogicalPlan =
+ parse(sqlText) {
+ parser =>
+ astBuilder.visit(parser.singleStatement()) match {
+ case plan: LogicalPlan => plan
+ case _ => delegate.parsePlan(sqlText)
+ }
+ }
+
+ override def parseExpression(sqlText: String): Expression = {
+ delegate.parseExpression(sqlText)
+ }
+
+ override def parseTableIdentifier(sqlText: String): TableIdentifier = {
+ delegate.parseTableIdentifier(sqlText)
+ }
+
+ override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = {
+ delegate.parseFunctionIdentifier(sqlText)
+ }
+
+ override def parseMultipartIdentifier(sqlText: String): Seq[String] = {
+ delegate.parseMultipartIdentifier(sqlText)
+ }
+
+ override def parseTableSchema(sqlText: String): StructType = {
+ delegate.parseTableSchema(sqlText)
+ }
+
+ override def parseDataType(sqlText: String): DataType = {
+ delegate.parseDataType(sqlText)
+ }
+
+ override def parseQuery(sqlText: String): LogicalPlan = {
+ delegate.parseQuery(sqlText)
+ }
+}
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
index 7b765924f..4033d8c6b 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
@@ -30,4 +30,9 @@ public class CHNativeCacheManager {
}
private static native CacheResult nativeGetCacheStatus(String jobId);
+
+ public static native String nativeCacheFiles(byte[] files);
+
+ // only for ut
+ public static native void removeFiles(String file, String cacheName);
}
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
index 0fa69e0d0..b6d538039 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CacheResult.java
@@ -16,7 +16,9 @@
*/
package org.apache.gluten.execution;
-public class CacheResult {
+import java.io.Serializable;
+
+public class CacheResult implements Serializable {
public enum Status {
RUNNING(0),
SUCCESS(1),
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
index f95aaa323..39dd94965 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/MetricsStep.java
@@ -35,6 +35,24 @@ public class MetricsStep {
@JsonProperty("selected_marks")
protected long selectedMarks;
+ @JsonProperty("read_cache_hits")
+ protected long readCacheHits;
+
+ @JsonProperty("miss_cache_hits")
+ protected long missCacheHits;
+
+ @JsonProperty("read_cache_bytes")
+ protected long readCacheBytes;
+
+ @JsonProperty("read_miss_bytes")
+ protected long readMissBytes;
+
+ @JsonProperty("read_cache_millisecond")
+ protected long readCacheMillisecond;
+
+ @JsonProperty("miss_cache_millisecond")
+ protected long missCacheMillisecond;
+
public String getName() {
return name;
}
@@ -82,4 +100,52 @@ public class MetricsStep {
public long getSelectedMarksPk() {
return selectedMarksPk;
}
+
+ public long getReadCacheHits() {
+ return readCacheHits;
+ }
+
+ public void setReadCacheHits(long readCacheHits) {
+ this.readCacheHits = readCacheHits;
+ }
+
+ public long getMissCacheHits() {
+ return missCacheHits;
+ }
+
+ public void setMissCacheHits(long missCacheHits) {
+ this.missCacheHits = missCacheHits;
+ }
+
+ public long getReadCacheBytes() {
+ return readCacheBytes;
+ }
+
+ public void setReadCacheBytes(long readCacheBytes) {
+ this.readCacheBytes = readCacheBytes;
+ }
+
+ public long getReadMissBytes() {
+ return readMissBytes;
+ }
+
+ public void setReadMissBytes(long readMissBytes) {
+ this.readMissBytes = readMissBytes;
+ }
+
+ public long getReadCacheMillisecond() {
+ return readCacheMillisecond;
+ }
+
+ public void setReadCacheMillisecond(long readCacheMillisecond) {
+ this.readCacheMillisecond = readCacheMillisecond;
+ }
+
+ public long getMissCacheMillisecond() {
+ return missCacheMillisecond;
+ }
+
+ public void setMissCacheMillisecond(long missCacheMillisecond) {
+ this.missCacheMillisecond = missCacheMillisecond;
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 7519580b9..c77d57262 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -22,6 +22,7 @@ import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.metrics.{IMetrics, NativeMetrics}
+import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.plan.PlanNode
import org.apache.gluten.substrait.rel._
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -164,6 +165,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]()
val lengths = new JArrayList[JLong]()
+ val fileSizes = new JArrayList[JLong]()
+ val modificationTimes = new JArrayList[JLong]()
val partitionColumns = new JArrayList[JMap[String, String]]
f.files.foreach {
file =>
@@ -173,6 +176,16 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
// TODO: Support custom partition location
val partitionColumn = new JHashMap[String, String]()
partitionColumns.add(partitionColumn)
+ val (fileSize, modificationTime) =
+
SparkShimLoader.getSparkShims.getFileSizeAndModificationTime(file)
+ (fileSize, modificationTime) match {
+ case (Some(size), Some(time)) =>
+ fileSizes.add(JLong.valueOf(size))
+ modificationTimes.add(JLong.valueOf(time))
+ case _ =>
+ fileSizes.add(0)
+ modificationTimes.add(0)
+ }
}
val preferredLocations =
CHAffinity.getFilePartitionLocations(paths.asScala.toArray,
f.preferredLocations())
@@ -181,8 +194,8 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
paths,
starts,
lengths,
- new JArrayList[JLong](),
- new JArrayList[JLong](),
+ fileSizes,
+ modificationTimes,
partitionColumns,
new JArrayList[JMap[String, String]](),
fileFormat,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index 9058bffd8..c53448cdd 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -128,7 +128,25 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected
marks primary"),
"selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected
marks"),
- "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks
primary")
+ "totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks
primary"),
+ "readCacheHits" -> SQLMetrics.createMetric(
+ sparkContext,
+ "Number of times the read from filesystem cache hit the cache"),
+ "missCacheHits" -> SQLMetrics.createMetric(
+ sparkContext,
+ "Number of times the read from filesystem cache miss the cache"),
+ "readCacheBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "Bytes read from filesystem cache"),
+ "readMissBytes" -> SQLMetrics.createSizeMetric(
+ sparkContext,
+ "Bytes read from filesystem cache source (from remote fs, etc)"),
+ "readCacheMillisecond" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "Time reading from filesystem cache"),
+ "missCacheMillisecond" -> SQLMetrics.createTimingMetric(
+ sparkContext,
+ "Time reading from filesystem cache source (from remote filesystem,
etc)")
)
override def genFileSourceScanTransformerMetricsUpdater(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 177d6a6f0..f4a7522d3 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -24,7 +24,7 @@ import
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.gluten.extension.injector.{RuleInjector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector,
RasInjector}
-import org.apache.gluten.parser.GlutenClickhouseSqlParser
+import org.apache.gluten.parser.{GlutenCacheFilesSqlParser,
GlutenClickhouseSqlParser}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule,
EqualToRewrite}
@@ -44,6 +44,8 @@ private object CHRuleApi {
def injectSpark(injector: SparkInjector): Unit = {
// Regular Spark rules.
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
+ injector.injectParser(
+ (spark, parserInterface) => new GlutenCacheFilesSqlParser(spark,
parserInterface))
injector.injectParser(
(spark, parserInterface) => new GlutenClickhouseSqlParser(spark,
parserInterface))
injector.injectResolutionRule(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
index f44c5ed1a..4dcae8feb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/FileSourceScanMetricsUpdater.scala
@@ -35,9 +35,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
val extraTime: SQLMetric = metrics("extraTime")
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")
- val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
- val selected_marks: SQLMetric = metrics("selectedMarks")
- val total_marks_pk: SQLMetric = metrics("totalMarksPk")
+ val selectedMarksPK: SQLMetric = metrics("selectedMarksPk")
+ val selectedMarks: SQLMetric = metrics("selectedMarks")
+ val totalMarksPK: SQLMetric = metrics("totalMarksPk")
+ val readCacheHits: SQLMetric = metrics("readCacheHits")
+ val missCacheHits: SQLMetric = metrics("missCacheHits")
+ val readCacheBytes: SQLMetric = metrics("readCacheBytes")
+ val readMissBytes: SQLMetric = metrics("readMissBytes")
+ val readCacheMillisecond: SQLMetric = metrics("readCacheMillisecond")
+ val missCacheMillisecond: SQLMetric = metrics("missCacheMillisecond")
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
// inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value)
@@ -56,9 +62,15 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
metricsData.getSteps.forEach(
step => {
- selected_marks_pk += step.selectedMarksPk
- selected_marks += step.selectedMarks
- total_marks_pk += step.totalMarksPk
+ selectedMarksPK += step.selectedMarksPk
+ selectedMarks += step.selectedMarks
+ totalMarksPK += step.totalMarksPk
+ readCacheHits += step.readCacheHits
+ missCacheHits += step.missCacheHits
+ readCacheBytes += step.readCacheBytes
+ readMissBytes += step.readMissBytes
+ readCacheMillisecond += step.readCacheMillisecond
+ missCacheMillisecond += step.missCacheMillisecond
})
MetricsUtil.updateExtraTimeMetric(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
similarity index 57%
copy from
backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
copy to
backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
index 18fc102be..b031dcf7a 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenCacheFileSqlParserBase.scala
@@ -16,40 +16,38 @@
*/
package org.apache.gluten.parser
-import org.apache.gluten.sql.parser.{GlutenClickhouseSqlBaseBaseListener,
GlutenClickhouseSqlBaseBaseVisitor, GlutenClickhouseSqlBaseLexer,
GlutenClickhouseSqlBaseParser}
-import org.apache.gluten.sql.parser.GlutenClickhouseSqlBaseParser._
+import org.apache.gluten.sql.parser.{GlutenCacheFileSqlBaseBaseListener,
GlutenCacheFileSqlBaseBaseVisitor, GlutenCacheFileSqlBaseLexer,
GlutenCacheFileSqlBaseParser}
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.{ParseErrorListener,
ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
-import org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand
+import org.apache.spark.sql.execution.commands.GlutenCacheFilesCommand
import org.apache.spark.sql.internal.VariableSubstitution
import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
-import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.antlr.v4.runtime.misc.ParseCancellationException
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import java.util.Locale
import scala.collection.JavaConverters._
-trait GlutenClickhouseSqlParserBase extends ParserInterface {
-
- protected val astBuilder = new GlutenClickhouseSqlAstBuilder
+trait GlutenCacheFileSqlParserBase extends ParserInterface {
+ protected val astBuilder = new GlutenCacheFileSqlAstBuilder
protected val substitution = new VariableSubstitution
- protected def parse[T](command: String)(toResult:
GlutenClickhouseSqlBaseParser => T): T = {
- val lexer = new GlutenClickhouseSqlBaseLexer(
+ protected def parse[T](command: String)(toResult:
GlutenCacheFileSqlBaseParser => T): T = {
+ val lexer = new GlutenCacheFileSqlBaseLexer(
new
UpperCaseCharStream(CharStreams.fromString(substitution.substitute(command))))
lexer.removeErrorListeners()
lexer.addErrorListener(ParseErrorListener)
val tokenStream = new CommonTokenStream(lexer)
- val parser = new GlutenClickhouseSqlBaseParser(tokenStream)
- parser.addParseListener(PostProcessor)
+ val parser = new GlutenCacheFileSqlBaseParser(tokenStream)
+
+ parser.addParseListener(GlutenCacheFileSqlPostProcessor)
parser.removeErrorListeners()
parser.addErrorListener(ParseErrorListener)
@@ -80,33 +78,34 @@ trait GlutenClickhouseSqlParserBase extends ParserInterface
{
message = e.message,
start = position,
stop = position,
- errorClass = Some("GLUTEN_CH_PARSING_ANALYSIS_ERROR"))
+ errorClass = Some("GLUTEN_CACHE_FILE_PARSING_ANALYSIS_ERROR"))
}
}
}
-class GlutenClickhouseSqlAstBuilder extends
GlutenClickhouseSqlBaseBaseVisitor[AnyRef] {
-
+class GlutenCacheFileSqlAstBuilder extends
GlutenCacheFileSqlBaseBaseVisitor[AnyRef] {
import org.apache.spark.sql.catalyst.parser.ParserUtils._
/** Convert a property list into a key-value map. */
- override def visitPropertyList(ctx: PropertyListContext): Map[String,
String] = withOrigin(ctx) {
- val properties = ctx.property.asScala.map {
- property =>
- val key = visitPropertyKey(property.key)
- val value = visitPropertyValue(property.value)
- key -> value
+ override def visitPropertyList(
+ ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String,
String] =
+ withOrigin(ctx) {
+ val properties = ctx.property.asScala.map {
+ property =>
+ val key = visitPropertyKey(property.key)
+ val value = visitPropertyValue(property.value)
+ key -> value
+ }
+ // Check for duplicate property names.
+ checkDuplicateKeys(properties.toSeq, ctx)
+ properties.toMap
}
- // Check for duplicate property names.
- checkDuplicateKeys(properties.toSeq, ctx)
- properties.toMap
- }
/**
* A property key can either be String or a collection of dot separated
elements. This function
* extracts the property key based on whether its a string literal or a
property identifier.
*/
- override def visitPropertyKey(key: PropertyKeyContext): String = {
+ override def visitPropertyKey(key:
GlutenCacheFileSqlBaseParser.PropertyKeyContext): String = {
if (key.stringLit() != null) {
string(visitStringLit(key.stringLit()))
} else {
@@ -118,7 +117,8 @@ class GlutenClickhouseSqlAstBuilder extends
GlutenClickhouseSqlBaseBaseVisitor[A
* A property value can be String, Integer, Boolean or Decimal. This
function extracts the
* property value based on whether its a string, integer, boolean or decimal
literal.
*/
- override def visitPropertyValue(value: PropertyValueContext): String = {
+ override def visitPropertyValue(
+ value: GlutenCacheFileSqlBaseParser.PropertyValueContext): String = {
if (value == null) {
null
} else if (value.identifier != null) {
@@ -132,7 +132,8 @@ class GlutenClickhouseSqlAstBuilder extends
GlutenClickhouseSqlBaseBaseVisitor[A
}
}
- def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = {
+ def visitPropertyKeyValues(
+ ctx: GlutenCacheFileSqlBaseParser.PropertyListContext): Map[String,
String] = {
val props = visitPropertyList(ctx)
val badKeys = props.collect { case (key, null) => key }
if (badKeys.nonEmpty) {
@@ -143,7 +144,7 @@ class GlutenClickhouseSqlAstBuilder extends
GlutenClickhouseSqlBaseBaseVisitor[A
props
}
- override def visitStringLit(ctx: StringLitContext): Token = {
+ override def visitStringLit(ctx:
GlutenCacheFileSqlBaseParser.StringLitContext): Token = {
if (ctx != null) {
if (ctx.STRING != null) {
ctx.STRING.getSymbol
@@ -156,58 +157,32 @@ class GlutenClickhouseSqlAstBuilder extends
GlutenClickhouseSqlBaseBaseVisitor[A
}
override def visitSingleStatement(
- ctx: GlutenClickhouseSqlBaseParser.SingleStatementContext): AnyRef =
withOrigin(ctx) {
+ ctx: GlutenCacheFileSqlBaseParser.SingleStatementContext): AnyRef =
withOrigin(ctx) {
visit(ctx.statement).asInstanceOf[LogicalPlan]
}
- override def visitCacheData(ctx:
GlutenClickhouseSqlBaseParser.CacheDataContext): AnyRef =
+ override def visitCacheFiles(ctx:
GlutenCacheFileSqlBaseParser.CacheFilesContext): AnyRef =
withOrigin(ctx) {
- val onlyMetaCache = ctx.META != null
val asynExecute = ctx.ASYNC != null
- val (tsfilter, partitionColumn, partitionValue) = if (ctx.AFTER != null)
{
- if (ctx.filter.TIMESTAMP != null) {
- (Some(string(ctx.filter.timestamp)), None, None)
- } else if (ctx.filter.datepartition != null && ctx.filter.datetime !=
null) {
- (None, Some(ctx.filter.datepartition.getText),
Some(string(ctx.filter.datetime)))
- } else {
- throw new ParseException(s"Illegal filter value ${ctx.getText}", ctx)
- }
- } else {
- (None, None, None)
- }
val selectedColuman = visitSelectedColumnNames(ctx.selectedColumns)
- val tablePropertyOverrides = Option(ctx.cacheProps)
+ val propertyOverrides = Option(ctx.cacheProps)
.map(visitPropertyKeyValues)
.getOrElse(Map.empty[String, String])
+ val path = ctx.path.getText
- GlutenCHCacheDataCommand(
- onlyMetaCache,
+ GlutenCacheFilesCommand(
asynExecute,
selectedColuman,
- Option(ctx.path).map(string),
- Option(ctx.table).map(visitTableIdentifier),
- tsfilter,
- partitionColumn,
- partitionValue,
- tablePropertyOverrides
+ path.substring(1, path.length - 1),
+ propertyOverrides
)
}
- override def visitPassThrough(ctx:
GlutenClickhouseSqlBaseParser.PassThroughContext): AnyRef =
+ override def visitPassThrough(ctx:
GlutenCacheFileSqlBaseParser.PassThroughContext): AnyRef =
null
- protected def visitTableIdentifier(ctx: QualifiedNameContext):
TableIdentifier = withOrigin(ctx) {
- ctx.identifier.asScala.toSeq match {
- case Seq(tbl) => TableIdentifier(tbl.getText)
- case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText))
- // TODO: Spark 3.5 supports catalog parameter
- // case Seq(catalog, db, tbl) =>
- // TableIdentifier(tbl.getText, Some(db.getText),
Some(catalog.getText))
- case _ => throw new ParseException(s"Illegal table name ${ctx.getText}",
ctx)
- }
- }
-
- override def visitSelectedColumnNames(ctx: SelectedColumnNamesContext):
Option[Seq[String]] =
+ override def visitSelectedColumnNames(
+ ctx: GlutenCacheFileSqlBaseParser.SelectedColumnNamesContext):
Option[Seq[String]] =
withOrigin(ctx) {
if (ctx != null) {
if (ctx.ASTERISK != null) {
@@ -224,10 +199,11 @@ class GlutenClickhouseSqlAstBuilder extends
GlutenClickhouseSqlBaseBaseVisitor[A
}
}
-case object PostProcessor extends GlutenClickhouseSqlBaseBaseListener {
+case object GlutenCacheFileSqlPostProcessor extends
GlutenCacheFileSqlBaseBaseListener {
/** Remove the back ticks from an Identifier. */
- override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = {
+ override def exitQuotedIdentifier(
+ ctx: GlutenCacheFileSqlBaseParser.QuotedIdentifierContext): Unit = {
replaceTokenByIdentifier(ctx, 1) {
token =>
// Remove the double back ticks in the string.
@@ -237,7 +213,7 @@ case object PostProcessor extends
GlutenClickhouseSqlBaseBaseListener {
}
/** Treat non-reserved keywords as Identifiers. */
- override def exitNonReserved(ctx: NonReservedContext): Unit = {
+ override def exitNonReserved(ctx:
GlutenCacheFileSqlBaseParser.NonReservedContext): Unit = {
replaceTokenByIdentifier(ctx, 0)(identity)
}
@@ -248,7 +224,7 @@ case object PostProcessor extends
GlutenClickhouseSqlBaseBaseListener {
val token = ctx.getChild(0).getPayload.asInstanceOf[Token]
val newToken = new CommonToken(
new org.antlr.v4.runtime.misc.Pair(token.getTokenSource,
token.getInputStream),
- GlutenClickhouseSqlBaseParser.IDENTIFIER,
+ GlutenCacheFileSqlBaseParser.IDENTIFIER,
token.getChannel,
token.getStartIndex + stripMargins,
token.getStopIndex - stripMargins
@@ -256,21 +232,3 @@ case object PostProcessor extends
GlutenClickhouseSqlBaseBaseListener {
parent.addChild(new TerminalNodeImpl(f(newToken)))
}
}
-
-class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
- override def consume(): Unit = wrapped.consume
- override def getSourceName(): String = wrapped.getSourceName
- override def index(): Int = wrapped.index
- override def mark(): Int = wrapped.mark
- override def release(marker: Int): Unit = wrapped.release(marker)
- override def seek(where: Int): Unit = wrapped.seek(where)
- override def size(): Int = wrapped.size
-
- override def getText(interval: Interval): String = wrapped.getText(interval)
-
- override def LA(i: Int): Int = {
- val la = wrapped.LA(i)
- if (la == 0 || la == IntStream.EOF) la
- else Character.toUpperCase(la)
- }
-}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
index 18fc102be..4a3883c8c 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/GlutenClickhouseSqlParserBase.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.internal.VariableSubstitution
import org.antlr.v4.runtime._
import org.antlr.v4.runtime.atn.PredictionMode
-import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
+import org.antlr.v4.runtime.misc.ParseCancellationException
import org.antlr.v4.runtime.tree.TerminalNodeImpl
import java.util.Locale
@@ -256,21 +256,3 @@ case object PostProcessor extends
GlutenClickhouseSqlBaseBaseListener {
parent.addChild(new TerminalNodeImpl(f(newToken)))
}
}
-
-class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
- override def consume(): Unit = wrapped.consume
- override def getSourceName(): String = wrapped.getSourceName
- override def index(): Int = wrapped.index
- override def mark(): Int = wrapped.mark
- override def release(marker: Int): Unit = wrapped.release(marker)
- override def seek(where: Int): Unit = wrapped.seek(where)
- override def size(): Int = wrapped.size
-
- override def getText(interval: Interval): String = wrapped.getText(interval)
-
- override def LA(i: Int): Int = {
- val la = wrapped.LA(i)
- if (la == 0 || la == IntStream.EOF) la
- else Character.toUpperCase(la)
- }
-}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
new file mode 100644
index 000000000..6ee2aac81
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/parser/UpperCaseCharStream.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.parser
+
+import org.antlr.v4.runtime.{CharStream, CodePointCharStream, IntStream}
+import org.antlr.v4.runtime.misc.Interval
+
+class UpperCaseCharStream(wrapped: CodePointCharStream) extends CharStream {
+ override def consume(): Unit = wrapped.consume
+ override def getSourceName(): String = wrapped.getSourceName
+ override def index(): Int = wrapped.index
+ override def mark(): Int = wrapped.mark
+ override def release(marker: Int): Unit = wrapped.release(marker)
+ override def seek(where: Int): Unit = wrapped.seek(where)
+ override def size(): Int = wrapped.size
+
+ override def getText(interval: Interval): String = wrapped.getText(interval)
+
+ override def LA(i: Int): Int = {
+ val la = wrapped.LA(i)
+ if (la == 0 || la == IntStream.EOF) la
+ else Character.toUpperCase(la)
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 8a3bde235..7f2b94eea 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -79,9 +79,20 @@ class GlutenExecutorEndpoint(val executorId: String, val
conf: SparkConf)
context.reply(
CacheJobInfo(status = false, "", s"executor: $executorId cache
data failed."))
}
- case GlutenMergeTreeCacheLoadStatus(jobId) =>
+ case GlutenCacheLoadStatus(jobId) =>
val status = CHNativeCacheManager.getCacheStatus(jobId)
context.reply(status)
+ case GlutenFilesCacheLoad(files) =>
+ try {
+ val jobId = CHNativeCacheManager.nativeCacheFiles(files)
+ context.reply(CacheJobInfo(status = true, jobId))
+ } catch {
+ case e: Exception =>
+ context.reply(
+ CacheJobInfo(
+ status = false,
+ s"executor: $executorId cache data failed. ${e.getMessage}"))
+ }
case e =>
logError(s"Received unexpected message. $e")
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index 800b15b99..e596e94fe 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -39,8 +39,12 @@ object GlutenRpcMessages {
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns:
util.Set[String])
extends GlutenRpcMessage
- case class GlutenMergeTreeCacheLoadStatus(jobId: String)
+ case class GlutenCacheLoadStatus(jobId: String)
case class CacheJobInfo(status: Boolean, jobId: String, reason: String = "")
extends GlutenRpcMessage
+
+ case class GlutenFilesCacheLoad(files: Array[Byte]) extends GlutenRpcMessage
+
+ case class GlutenFilesCacheLoadStatus(jobId: String)
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index f32d22d5e..bb3cb5acc 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -16,24 +16,20 @@
*/
package org.apache.spark.sql.execution.commands
-import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.CacheResult
-import org.apache.gluten.execution.CacheResult.Status
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.substrait.rel.ExtensionTableBuilder
import org.apache.spark.affinity.CHAffinity
import org.apache.spark.rpc.GlutenDriverEndpoint
-import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo,
GlutenMergeTreeCacheLoad, GlutenMergeTreeCacheLoadStatus}
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo,
GlutenMergeTreeCacheLoad}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, GreaterThanOrEqual, IsNotNull, Literal}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import
org.apache.spark.sql.execution.commands.GlutenCHCacheDataCommand.{checkExecutorId,
collectJobTriggerResult, toExecutorId, waitAllJobFinish, waitRpcResults}
+import org.apache.spark.sql.execution.commands.GlutenCacheBase._
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}
-import org.apache.spark.util.ThreadUtils
import org.apache.hadoop.fs.Path
@@ -43,7 +39,6 @@ import java.util.{ArrayList => JList}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
-import scala.concurrent.duration.Duration
case class GlutenCHCacheDataCommand(
onlyMetaCache: Boolean,
@@ -140,9 +135,7 @@ case class GlutenCHCacheDataCommand(
val executorIdsToAddFiles =
scala.collection.mutable.Map[String, ArrayBuffer[AddMergeTreeParts]]()
val executorIdsToParts = scala.collection.mutable.Map[String, String]()
- executorIdsToAddFiles.put(
- GlutenCHCacheDataCommand.ALL_EXECUTORS,
- new ArrayBuffer[AddMergeTreeParts]())
+ executorIdsToAddFiles.put(ALL_EXECUTORS, new
ArrayBuffer[AddMergeTreeParts]())
selectedAddFiles.foreach(
addFile => {
val mergeTreePart = addFile.asInstanceOf[AddMergeTreeParts]
@@ -156,7 +149,7 @@ case class GlutenCHCacheDataCommand(
if (locations.isEmpty) {
// non soft affinity
- executorIdsToAddFiles(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+ executorIdsToAddFiles(ALL_EXECUTORS)
.append(mergeTreePart)
} else {
locations.foreach(
@@ -205,9 +198,9 @@ case class GlutenCHCacheDataCommand(
}
})
val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
- if (executorIdsToParts.contains(GlutenCHCacheDataCommand.ALL_EXECUTORS)) {
+ if (executorIdsToParts.contains(ALL_EXECUTORS)) {
// send all parts to all executors
- val tableMessage =
executorIdsToParts(GlutenCHCacheDataCommand.ALL_EXECUTORS)
+ val tableMessage = executorIdsToParts(ALL_EXECUTORS)
GlutenDriverEndpoint.executorDataMap.forEach(
(executorId, executor) => {
futureList.append(
@@ -230,86 +223,7 @@ case class GlutenCHCacheDataCommand(
)))
})
}
- val resultList = waitRpcResults(futureList)
- if (asynExecute) {
- val res = collectJobTriggerResult(resultList)
- Seq(Row(res._1, res._2.mkString(";")))
- } else {
- val res = waitAllJobFinish(resultList)
- Seq(Row(res._1, res._2))
- }
- }
-
-}
-
-object GlutenCHCacheDataCommand {
- private val ALL_EXECUTORS = "allExecutors"
-
- private def toExecutorId(executorId: String): String =
- executorId.split("_").last
-
- def waitAllJobFinish(jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean,
String) = {
- val res = collectJobTriggerResult(jobs)
- var status = res._1
- val messages = res._2
- jobs.foreach(
- job => {
- if (status) {
- var complete = false
- while (!complete) {
- Thread.sleep(5000)
- val future_result = GlutenDriverEndpoint.executorDataMap
- .get(toExecutorId(job._1))
- .executorEndpointRef
- .ask[CacheResult](GlutenMergeTreeCacheLoadStatus(job._2.jobId))
- val result = ThreadUtils.awaitResult(future_result, Duration.Inf)
- result.getStatus match {
- case Status.ERROR =>
- status = false
- messages.append(
- s"executor : {}, failed with message: {};",
- job._1,
- result.getMessage)
- complete = true
- case Status.SUCCESS =>
- complete = true
- case _ =>
- // still running
- }
- }
- }
- })
- (status, messages.mkString(";"))
- }
-
- private def collectJobTriggerResult(jobs: ArrayBuffer[(String,
CacheJobInfo)]) = {
- var status = true
- val messages = ArrayBuffer[String]()
- jobs.foreach(
- job => {
- if (!job._2.status) {
- messages.append(job._2.reason)
- status = false
- }
- })
- (status, messages)
- }
- private def waitRpcResults = (futureList: ArrayBuffer[(String,
Future[CacheJobInfo])]) => {
- val resultList = ArrayBuffer[(String, CacheJobInfo)]()
- futureList.foreach(
- f => {
- resultList.append((f._1, ThreadUtils.awaitResult(f._2, Duration.Inf)))
- })
- resultList
+ getResult(futureList, asynExecute)
}
-
- private def checkExecutorId(executorId: String): Unit = {
- if
(!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) {
- throw new GlutenException(
- s"executor $executorId not found," +
- s" all executors are
${GlutenDriverEndpoint.executorDataMap.toString}")
- }
- }
-
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
new file mode 100644
index 000000000..c4e9f51bc
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheBase.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.execution.CacheResult
+import org.apache.gluten.execution.CacheResult.Status
+
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo,
GlutenCacheLoadStatus}
+import org.apache.spark.sql.Row
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+
+object GlutenCacheBase {
+ def ALL_EXECUTORS: String = "allExecutors"
+
+ def toExecutorId(executorId: String): String =
+ executorId.split("_").last
+
+ protected def waitRpcResults
+ : ArrayBuffer[(String, Future[CacheJobInfo])] => ArrayBuffer[(String,
CacheJobInfo)] =
+ (futureList: ArrayBuffer[(String, Future[CacheJobInfo])]) => {
+ val resultList = ArrayBuffer[(String, CacheJobInfo)]()
+ futureList.foreach(
+ f => {
+ resultList.append((f._1, ThreadUtils.awaitResult(f._2,
Duration.Inf)))
+ })
+ resultList
+ }
+
+ def checkExecutorId(executorId: String): Unit = {
+ if
(!GlutenDriverEndpoint.executorDataMap.containsKey(toExecutorId(executorId))) {
+ throw new GlutenException(
+ s"executor $executorId not found," +
+ s" all executors are
${GlutenDriverEndpoint.executorDataMap.toString}")
+ }
+ }
+
+ def waitAllJobFinish(
+ jobs: ArrayBuffer[(String, CacheJobInfo)],
+ ask: (String, String) => Future[CacheResult]): (Boolean, String) = {
+ val res = collectJobTriggerResult(jobs)
+ var status = res._1
+ val messages = res._2
+ jobs.foreach(
+ job => {
+ if (status) {
+ var complete = false
+ while (!complete) {
+ Thread.sleep(5000)
+ val future_result = ask(job._1, job._2.jobId)
+ val result = ThreadUtils.awaitResult(future_result, Duration.Inf)
+ result.getStatus match {
+ case Status.ERROR =>
+ status = false
+ messages.append(
+ s"executor : {}, failed with message: {};",
+ job._1,
+ result.getMessage)
+ complete = true
+ case Status.SUCCESS =>
+ complete = true
+ case _ =>
+ // still running
+ }
+ }
+ }
+ })
+ (status, messages.mkString(";"))
+ }
+
+ def collectJobTriggerResult(
+ jobs: ArrayBuffer[(String, CacheJobInfo)]): (Boolean,
ArrayBuffer[String]) = {
+ var status = true
+ val messages = ArrayBuffer[String]()
+ jobs.foreach(
+ job => {
+ if (!job._2.status) {
+ messages.append(job._2.reason)
+ status = false
+ }
+ })
+ (status, messages)
+ }
+
+ def getResult(
+ futureList: ArrayBuffer[(String, Future[CacheJobInfo])],
+ async: Boolean): Seq[Row] = {
+ val resultList = waitRpcResults(futureList)
+ if (async) {
+ val res = collectJobTriggerResult(resultList)
+ Seq(Row(res._1, res._2.mkString(";")))
+ } else {
+ val fetchStatus: (String, String) => Future[CacheResult] =
+ (executorId: String, jobId: String) => {
+ GlutenDriverEndpoint.executorDataMap
+ .get(toExecutorId(executorId))
+ .executorEndpointRef
+ .ask[CacheResult](GlutenCacheLoadStatus(jobId))
+ }
+ val res = waitAllJobFinish(resultList, fetchStatus)
+ Seq(Row(res._1, res._2))
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
new file mode 100644
index 000000000..0a08df7ce
--- /dev/null
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCacheFilesCommand.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.commands
+
+import org.apache.gluten.substrait.rel.LocalFilesBuilder
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+
+import org.apache.spark.affinity.CHAffinity
+import org.apache.spark.rpc.GlutenDriverEndpoint
+import org.apache.spark.rpc.GlutenRpcMessages.{CacheJobInfo,
GlutenFilesCacheLoad}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.commands.GlutenCacheBase._
+import org.apache.spark.sql.types.{BooleanType, StringType}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+
+import java.io.FileNotFoundException
+import java.lang.{Long => JLong}
+import java.util.{ArrayList => JArrayList, HashMap => JHashMap, Map => JMap}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+
+case class GlutenCacheFilesCommand(
+ async: Boolean,
+ selectedColumn: Option[Seq[String]],
+ filePath: String,
+ propertyOverrides: Map[String, String]
+) extends LeafRunnableCommand {
+
+ override def output: Seq[Attribute] = Seq(
+ AttributeReference("result", BooleanType, nullable = false)(),
+ AttributeReference("reason", StringType, nullable = false)())
+
+ override def run(session: SparkSession): Seq[Row] = {
+ val targetFile = new Path(filePath)
+ val hadoopConf: Configuration = session.sparkContext.hadoopConfiguration
+ val fs = targetFile.getFileSystem(hadoopConf)
+ if (!fs.exists(targetFile)) {
+ throw new FileNotFoundException(filePath)
+ }
+
+ val recursive =
+ if ("true".equalsIgnoreCase(propertyOverrides.getOrElse("recursive",
"false"))) {
+ true
+ } else {
+ false
+ }
+
+ val files: Seq[FileStatus] = listFiles(targetFile, recursive, fs)
+ val executorIdsToFiles =
+ scala.collection.mutable.Map[String, ArrayBuffer[FileStatus]]()
+ executorIdsToFiles.put(ALL_EXECUTORS, new ArrayBuffer[FileStatus]())
+
+ files.foreach(
+ fileStatus => {
+ val locations =
CHAffinity.getHostLocations(fileStatus.getPath.toUri.toASCIIString)
+ if (locations.isEmpty) {
+ executorIdsToFiles(ALL_EXECUTORS).append(fileStatus)
+ } else {
+ locations.foreach(
+ executor => {
+ if (!executorIdsToFiles.contains(executor)) {
+ executorIdsToFiles.put(executor, new ArrayBuffer[FileStatus]())
+ }
+ executorIdsToFiles(executor).append(fileStatus)
+ })
+ }
+ })
+
+ val executorIdsToLocalFiles = executorIdsToFiles
+ .filter(_._2.nonEmpty)
+ .map {
+ case (executorId, fileStatusArray) =>
+ val paths = new JArrayList[String]()
+ val starts = new JArrayList[JLong]()
+ val lengths = new JArrayList[JLong]()
+ val partitionColumns = new JArrayList[JMap[String, String]]
+
+ fileStatusArray.foreach(
+ fileStatus => {
+ paths.add(fileStatus.getPath.toUri.toASCIIString)
+ starts.add(JLong.valueOf(0))
+ lengths.add(JLong.valueOf(fileStatus.getLen))
+ partitionColumns.add(new JHashMap[String, String]())
+ })
+
+ val localFile = LocalFilesBuilder.makeLocalFiles(
+ null,
+ paths,
+ starts,
+ lengths,
+ lengths,
+ new JArrayList[JLong](),
+ partitionColumns,
+ new JArrayList[JMap[String, String]](),
+ ReadFileFormat.ParquetReadFormat, // ignore format in backend
+ new JArrayList[String](),
+ new JHashMap[String, String]()
+ )
+
+ (executorId, localFile)
+ }
+ .toMap
+
+ val futureList = ArrayBuffer[(String, Future[CacheJobInfo])]()
+ val fileNodeOption = executorIdsToLocalFiles.get(ALL_EXECUTORS)
+ if (fileNodeOption.isDefined) {
+ GlutenDriverEndpoint.executorDataMap.forEach(
+ (executorId, executor) => {
+ futureList.append(
+ (
+ executorId,
+ executor.executorEndpointRef.ask[CacheJobInfo](
+
GlutenFilesCacheLoad(fileNodeOption.get.toProtobuf.toByteArray))))
+ })
+ } else {
+ executorIdsToLocalFiles.foreach {
+ case (executorId, fileNode) =>
+ checkExecutorId(executorId)
+ val executor =
GlutenDriverEndpoint.executorDataMap.get(toExecutorId(executorId))
+ futureList.append(
+ (
+ executorId,
+ executor.executorEndpointRef.ask[CacheJobInfo](
+ GlutenFilesCacheLoad(fileNode.toProtobuf.toByteArray))))
+ }
+ }
+
+ getResult(futureList, async)
+ }
+
+ private def listFiles(targetFile: Path, recursive: Boolean, fs: FileSystem):
Seq[FileStatus] = {
+ val dirContents = fs
+ .listStatus(targetFile)
+ .flatMap(f => addInputPathRecursively(fs, f, recursive))
+ .filter(isNonEmptyDataFile)
+ .toSeq
+ dirContents
+ }
+
+ private def addInputPathRecursively(
+ fs: FileSystem,
+ files: FileStatus,
+ recursive: Boolean): Seq[FileStatus] = {
+ if (files.isFile) {
+ Seq(files)
+ } else if (recursive) {
+ fs.listStatus(files.getPath)
+ .flatMap(
+ file => {
+ if (file.isFile) {
+ Seq(file)
+ } else {
+ addInputPathRecursively(fs, file, recursive)
+ }
+ })
+ } else {
+ Seq()
+ }
+ }
+
+ private def isNonEmptyDataFile(f: FileStatus): Boolean = {
+ if (!f.isFile || f.getLen == 0) {
+ false
+ } else {
+ val name = f.getPath.getName
+ !((name.startsWith("_") && !name.contains("=")) || name.startsWith("."))
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index f914eaa18..dfc5fbd3b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -41,6 +41,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
version(0) + "." + version(1)
}
+ val CH_CONFIG_PREFIX: String =
"spark.gluten.sql.columnar.backend.ch.runtime_config"
+ val CH_SETTING_PREFIX: String =
"spark.gluten.sql.columnar.backend.ch.runtime_settings"
+
val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
new file mode 100644
index 000000000..ed8dcbbc1
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.tpch
+
+import org.apache.gluten.execution.{CHNativeCacheManager,
FileSourceScanExecTransformer, GlutenClickHouseTPCHAbstractSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+import org.apache.hadoop.fs.Path
+
+class GlutenClickHouseHDFSSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val tablesPath: String = HDFS_URL_ENDPOINT + "/tpch-data"
+ override protected val tpchQueries: String =
+ rootPath + "../../../../gluten-core/src/test/resources/tpch-queries"
+ override protected val queriesResults: String = rootPath + "queries-output"
+
+ private val hdfsCachePath = "/tmp/gluten_hdfs_cache/"
+ private val cache_name = "gluten_cache"
+
+ /** Run Gluten + ClickHouse Backend with SortShuffleManager */
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager", "sort")
+ .set("spark.io.compression.codec", "snappy")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set(s"$CH_CONFIG_PREFIX.use_local_format", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm",
"sparkMurmurHash3_32")
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.enabled", "true")
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.name", cache_name)
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.path", hdfsCachePath)
+ .set(s"$CH_CONFIG_PREFIX.gluten_cache.local.max_size", "10Gi")
+ .set(s"$CH_CONFIG_PREFIX.reuse_disk_cache", "false")
+ .set("spark.sql.adaptive.enabled", "false")
+ }
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ deleteCache()
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ deleteCache()
+ }
+
+ private def deleteCache(): Unit = {
+ val targetFile = new Path(tablesPath)
+ val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+ fs.listStatus(targetFile)
+ .foreach(
+ table => {
+ if (table.isDirectory) {
+ fs.listStatus(table.getPath)
+ .foreach(
+ data => {
+ if (data.isFile) {
+ CHNativeCacheManager
+ .removeFiles(data.getPath.toUri.getPath.substring(1),
cache_name)
+ }
+ })
+ }
+ })
+ clearDataPath(hdfsCachePath)
+ }
+
+ val runWithoutCache: () => Unit = () => {
+ runTPCHQuery(6) {
+ df =>
+ val plans = df.queryExecution.executedPlan.collect {
+ case scanExec: FileSourceScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans.head.metrics("readMissBytes").value != 0)
+ }
+ }
+
+ val runWithCache: () => Unit = () => {
+ runTPCHQuery(6) {
+ df =>
+ val plans = df.queryExecution.executedPlan.collect {
+ case scanExec: FileSourceScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans.head.metrics("readMissBytes").value == 0)
+ assert(plans.head.metrics("readCacheBytes").value != 0)
+ }
+ }
+
+ test("test hdfs cache") {
+ runWithoutCache()
+ runWithCache()
+ }
+
+ test("test cache file command") {
+ runSql(
+ s"CACHE FILES select * from '$HDFS_URL_ENDPOINT/tpch-data/lineitem'",
+ noFallBack = false) { _ => }
+ runWithCache()
+ }
+
+ test("test no cache by query") {
+ withSQLConf(
+
s"$CH_SETTING_PREFIX.read_from_filesystem_cache_if_exists_otherwise_bypass_cache"
-> "true") {
+ runWithoutCache()
+ }
+
+ runWithoutCache()
+ runWithCache()
+ }
+}
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 8e07eea01..9558bf957 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -573,6 +573,18 @@ std::vector<String>
BackendInitializerUtil::wrapDiskPathConfig(
std::vector<String> changed_paths;
if (path_prefix.empty() && path_suffix.empty())
return changed_paths;
+
+ auto change_func = [&](String key) -> void
+ {
+ if (const String value = config.getString(key, ""); value != "")
+ {
+ const String change_value = path_prefix + value + path_suffix;
+ config.setString(key, change_value);
+ changed_paths.emplace_back(change_value);
+ LOG_INFO(getLogger("BackendInitializerUtil"), "Change config `{}`
from '{}' to {}.", key, value, change_value);
+ }
+ };
+
Poco::Util::AbstractConfiguration::Keys disks;
std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten",
"cache"};
config.keys("storage_configuration.disks", disks);
@@ -586,26 +598,14 @@ std::vector<String>
BackendInitializerUtil::wrapDiskPathConfig(
if (!disk_types.contains(disk_type))
return;
if (disk_type == "cache")
- {
- String path = config.getString(disk_prefix + ".path", "");
- if (!path.empty())
- {
- String final_path = path_prefix + path + path_suffix;
- config.setString(disk_prefix + ".path", final_path);
- changed_paths.emplace_back(final_path);
- }
- }
+ change_func(disk_prefix + ".path");
else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
- {
- String metadata_path = config.getString(disk_prefix +
".metadata_path", "");
- if (!metadata_path.empty())
- {
- String final_path = path_prefix + metadata_path +
path_suffix;
- config.setString(disk_prefix + ".metadata_path",
final_path);
- changed_paths.emplace_back(final_path);
- }
- }
+ change_func(disk_prefix + ".metadata_path");
});
+
+ change_func("path");
+ change_func("gluten_cache.local.path");
+
return changed_paths;
}
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index c91b7264d..a92155d14 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -195,6 +195,8 @@ public:
inline static const String GLUTEN_TASK_OFFHEAP =
"spark.gluten.memory.task.offHeap.size.in.bytes";
+ inline static const String GLUTEN_LOCAL_CACHE_PREFIX =
"gluten_cache.local.";
+
/// On yarn mode, native writing on hdfs cluster takes yarn container user
as the user passed to libhdfs3, which
/// will cause permission issue because yarn container user is not the
owner of the hdfs dir to be written.
/// So we need to get the spark user from env and pass it to libhdfs3.
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h
b/cpp-ch/local-engine/Common/GlutenConfig.h
index d0e2e9dee..38c4ce162 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -20,6 +20,7 @@
#include <Interpreters/Context.h>
#include <base/types.h>
#include <base/unit.h>
+#include <Common/logger_useful.h>
namespace local_engine
{
@@ -134,13 +135,17 @@ struct HdfsConfig
{
inline static const String HDFS_ASYNC = "hdfs.enable_async_io";
- bool hdfs_async = true;
+ bool hdfs_async;
- static HdfsConfig loadFromContext(DB::ContextPtr context)
+ static HdfsConfig loadFromContext(const Poco::Util::AbstractConfiguration
& config, const DB::ReadSettings & read_settings)
{
- HdfsConfig config;
- config.hdfs_async = context->getConfigRef().getBool(HDFS_ASYNC, true);
- return config;
+ HdfsConfig hdfs;
+ if (read_settings.enable_filesystem_cache)
+ hdfs.hdfs_async = false;
+ else
+ hdfs.hdfs_async = config.getBool(HDFS_ASYNC, true);
+
+ return hdfs;
}
};
@@ -159,10 +164,17 @@ struct S3Config
static S3Config loadFromContext(DB::ContextPtr context)
{
S3Config config;
- config.s3_local_cache_enabled =
context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false);
- config.s3_local_cache_max_size =
context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB);
- config.s3_local_cache_cache_path =
context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, "");
- config.s3_gcs_issue_compose_request =
context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false);
+
+ if (context->getConfigRef().has("S3_LOCAL_CACHE_ENABLE"))
+ {
+ LOG_WARNING(&Poco::Logger::get("S3Config"), "Config {} has
deprecated.", S3_LOCAL_CACHE_ENABLE);
+
+ config.s3_local_cache_enabled =
context->getConfigRef().getBool(S3_LOCAL_CACHE_ENABLE, false);
+ config.s3_local_cache_max_size =
context->getConfigRef().getUInt64(S3_LOCAL_CACHE_MAX_SIZE, 100_GiB);
+ config.s3_local_cache_cache_path =
context->getConfigRef().getString(S3_LOCAL_CACHE_CACHE_PATH, "");
+ config.s3_gcs_issue_compose_request =
context->getConfigRef().getBool(S3_GCS_ISSUE_COMPOSE_REQUEST, false);
+ }
+
return config;
}
};
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp
b/cpp-ch/local-engine/Common/QueryContext.cpp
index e5f5dd5dc..ff9c15115 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -77,14 +77,19 @@ int64_t QueryContextManager::initializeQuery()
DB::ContextMutablePtr QueryContextManager::currentQueryContext()
{
- if (!CurrentThread::getGroup())
- {
- throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not
found.");
- }
+ auto thread_group = currentThreadGroup();
int64_t id = reinterpret_cast<int64_t>(CurrentThread::getGroup().get());
return query_map.get(id)->query_context;
}
+std::shared_ptr<DB::ThreadGroup> QueryContextManager::currentThreadGroup()
+{
+ if (auto thread_group = CurrentThread::getGroup())
+ return thread_group;
+
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread group not found.");
+}
+
void
QueryContextManager::logCurrentPerformanceCounters(ProfileEvents::Counters &
counters)
{
if (!CurrentThread::getGroup())
diff --git a/cpp-ch/local-engine/Common/QueryContext.h
b/cpp-ch/local-engine/Common/QueryContext.h
index 0fbf49773..4770327d1 100644
--- a/cpp-ch/local-engine/Common/QueryContext.h
+++ b/cpp-ch/local-engine/Common/QueryContext.h
@@ -30,6 +30,7 @@ public:
}
int64_t initializeQuery();
DB::ContextMutablePtr currentQueryContext();
+ static std::shared_ptr<DB::ThreadGroup> currentThreadGroup();
void logCurrentPerformanceCounters(ProfileEvents::Counters& counters);
size_t currentPeakMemory(int64_t id);
void finalizeQuery(int64_t id);
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 7b8b4cfd9..e13864260 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -15,15 +15,63 @@
* limitations under the License.
*/
#include "RelMetric.h"
+
#include <Processors/IProcessor.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
+#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
+#include <Common/QueryContext.h>
using namespace rapidjson;
+namespace ProfileEvents
+{
+extern const Event FileSegmentWaitReadBufferMicroseconds;
+extern const Event FileSegmentReadMicroseconds;
+extern const Event FileSegmentCacheWriteMicroseconds;
+extern const Event FileSegmentPredownloadMicroseconds;
+extern const Event FileSegmentUsedBytes;
+
+extern const Event CachedReadBufferReadFromSourceMicroseconds;
+extern const Event CachedReadBufferReadFromCacheMicroseconds;
+extern const Event CachedReadBufferCacheWriteMicroseconds;
+extern const Event CachedReadBufferReadFromSourceBytes;
+extern const Event CachedReadBufferReadFromCacheBytes;
+extern const Event CachedReadBufferCacheWriteBytes;
+extern const Event CachedReadBufferCreateBufferMicroseconds;
+
+extern const Event CachedReadBufferReadFromCacheHits;
+extern const Event CachedReadBufferReadFromCacheMisses;
+}
+
namespace local_engine
{
+static void writeCacheHits(Writer<StringBuffer> & writer)
+{
+ const auto thread_group = QueryContextManager::currentThreadGroup();
+ auto & counters = thread_group->performance_counters;
+ auto read_cache_hits =
counters[ProfileEvents::CachedReadBufferReadFromCacheHits].load();
+ auto miss_cache_hits =
counters[ProfileEvents::CachedReadBufferReadFromCacheMisses].load();
+ auto read_cache_bytes =
counters[ProfileEvents::CachedReadBufferReadFromCacheBytes].load();
+ auto read_miss_bytes =
counters[ProfileEvents::CachedReadBufferReadFromSourceBytes].load();
+ auto read_cache_millisecond =
counters[ProfileEvents::CachedReadBufferReadFromCacheMicroseconds].load() /
1000;
+ auto miss_cache_millisecond =
counters[ProfileEvents::CachedReadBufferReadFromSourceMicroseconds].load() /
1000;
+
+ writer.Key("read_cache_hits");
+ writer.Uint64(read_cache_hits);
+ writer.Key("miss_cache_hits");
+ writer.Uint64(miss_cache_hits);
+ writer.Key("read_cache_bytes");
+ writer.Uint64(read_cache_bytes);
+ writer.Key("read_miss_bytes");
+ writer.Uint64(read_miss_bytes);
+ writer.Key("read_cache_millisecond");
+ writer.Uint64(read_cache_millisecond);
+ writer.Key("miss_cache_millisecond");
+ writer.Uint64(miss_cache_millisecond);
+}
+
RelMetric::RelMetric(size_t id_, const String & name_,
std::vector<DB::IQueryPlanStep *> & steps_) : id(id_), name(name_),
steps(steps_)
{
}
@@ -117,7 +165,7 @@ void RelMetric::serialize(Writer<StringBuffer> & writer,
bool) const
}
writer.EndArray();
- if (auto read_mergetree =
dynamic_cast<DB::ReadFromMergeTree*>(step))
+ if (auto read_mergetree = dynamic_cast<DB::ReadFromMergeTree
*>(step))
{
auto selected_marks_pk =
read_mergetree->getAnalysisResult().selected_marks_pk;
auto selected_marks =
read_mergetree->getAnalysisResult().selected_marks;
@@ -128,6 +176,11 @@ void RelMetric::serialize(Writer<StringBuffer> & writer,
bool) const
writer.Uint64(selected_marks);
writer.Key("total_marks_pk");
writer.Uint64(total_marks_pk);
+ writeCacheHits(writer);
+ }
+ else if (dynamic_cast<SubstraitFileSourceStep *>(step))
+ {
+ writeCacheHits(writer);
}
writer.EndObject();
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index a97f0c72a..0dc852a90 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -16,20 +16,21 @@
*/
#include "CacheManager.h"
+#include <ranges>
#include <Core/Settings.h>
#include <Disks/IStoragePolicy.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
-#include <Interpreters/Context.h>
+#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
-#include <Storages/Mergetree/MetaDataHelper.h>
-#include <Common/ThreadPool.h>
+#include <Interpreters/Context.h>
#include <Parser/MergeTreeRelParser.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
#include <Common/Logger.h>
+#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
-#include <ranges>
#include <jni/jni_common.h>
@@ -178,4 +179,62 @@ jobject CacheManager::getCacheStatus(JNIEnv * env, const
String & jobId)
}
return env->NewObject(cache_result_class, cache_result_constructor,
status, charTojstring(env, message.c_str()));
}
+
+Task CacheManager::cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles
& file, ReadBufferBuilderPtr read_buffer_builder)
+{
+ auto task = [file, read_buffer_builder, context = this->context]()
+ {
+ LOG_INFO(getLogger("CacheManager"), "Loading cache file {}",
file.uri_file());
+
+ try
+ {
+ std::unique_ptr<DB::ReadBuffer> rb =
read_buffer_builder->build(file);
+ while (!rb->eof())
+ rb->ignoreAll();
+ }
+ catch (std::exception & e)
+ {
+ LOG_ERROR(getLogger("CacheManager"), "Load cache file {} failed.\n
{}", file.uri_file(), e.what());
+ std::rethrow_exception(std::current_exception());
+ }
+ };
+
+ return std::move(task);
+}
+
+JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
+{
+ JobId id = toString(UUIDHelpers::generateV4());
+ Job job(id);
+
+ if (file_infos.items_size())
+ {
+ const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
+ const auto read_buffer_builder =
ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(),
context);
+
+ if (read_buffer_builder->file_cache)
+ for (const auto & file : file_infos.items())
+ job.addTask(cacheFile(file, read_buffer_builder));
+ else
+ LOG_WARNING(getLogger("CacheManager"), "Load cache skipped because
cache not enabled.");
+ }
+
+ auto & scheduler = JobScheduler::instance();
+ scheduler.scheduleJob(std::move(job));
+ return id;
+}
+
+void CacheManager::removeFiles(String file, String cache_name)
+{
+ // only for ut
+ for (const auto & [name, file_cache] :
FileCacheFactory::instance().getAll())
+ {
+ if (name != cache_name)
+ continue;
+
+ if (const auto cache = file_cache->cache)
+ cache->removePathIfExists(file,
DB::FileCache::getCommonUser().user_id);
+ }
+}
+
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index b88a3ea03..6335f86bb 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -15,9 +15,13 @@
* limitations under the License.
*/
#pragma once
+
+#include <substrait/plan.pb.h>
+
#include <Disks/IDisk.h>
#include <Storages/Cache/JobScheduler.h>
#include <jni.h>
+#include <Storages/SubstraitSource/ReadBufferBuilder.h>
namespace local_engine
{
@@ -40,6 +44,10 @@ public:
Task cachePart(const MergeTreeTable& table, const MergeTreePart& part,
const std::unordered_set<String>& columns);
JobId cacheParts(const String& table_def, const
std::unordered_set<String>& columns);
static jobject getCacheStatus(JNIEnv * env, const String& jobId);
+
+ Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file,
ReadBufferBuilderPtr read_buffer_builder);
+ JobId cacheFiles(substrait::ReadRel::LocalFiles file_infos);
+ static void removeFiles(String file, String cache_name);
private:
CacheManager() = default;
DB::ContextMutablePtr context;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
index da1589007..b32073db5 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
@@ -35,7 +35,7 @@
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Cache/FileCacheSettings.h>
-#include <Interpreters/Context_fwd.h>
+#include <Interpreters/Context.h>
#include <Storages/ObjectStorage/HDFS/AsynchronousReadBufferFromHDFS.h>
#include <Storages/ObjectStorage/HDFS/HDFSCommon.h>
#include <Storages/ObjectStorage/HDFS/ReadBufferFromHDFS.h>
@@ -49,7 +49,6 @@
#include <Common/CHUtil.h>
#include <Common/FileCacheConcurrentMap.h>
#include <Common/GlutenConfig.h>
-#include <Common/Throttler.h>
#include <Common/logger_useful.h>
#include <Common/safe_cast.h>
@@ -77,8 +76,6 @@ namespace ErrorCodes
}
}
-namespace fs = std::filesystem;
-
namespace local_engine
{
template <class key_type, class value_type>
@@ -205,6 +202,7 @@ public:
#if USE_HDFS
class HDFSFileReadBufferBuilder : public ReadBufferBuilder
{
+ using ReadBufferCreator =
std::function<std::unique_ptr<DB::ReadBufferFromFileBase>(bool restricted_seek,
const DB::StoredObject & object)>;
public:
explicit HDFSFileReadBufferBuilder(DB::ContextPtr context_) :
ReadBufferBuilder(context_), context(context_) { }
~HDFSFileReadBufferBuilder() override = default;
@@ -212,18 +210,21 @@ public:
std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool
set_read_util_position) override
{
- auto config = HdfsConfig::loadFromContext(context);
+ DB::ReadSettings read_settings = getReadSettings(context);
+ auto & config = context->getConfigRef();
+ auto hdfs_config = HdfsConfig::loadFromContext(config, read_settings);
Poco::URI file_uri(file_info.uri_file());
std::string uri_path = "hdfs://" + file_uri.getHost();
if (file_uri.getPort())
- uri_path += ":" + std::to_string(file_uri.getPort());
+ uri_path += ":" +
std::to_string(static_cast<unsigned>(file_uri.getPort()));
- DB::ReadSettings read_settings;
- std::unique_ptr<DB::ReadBuffer> read_buffer;
+ size_t read_util_position = 0;
+ size_t read_begin = 0;
if (set_read_util_position)
{
std::pair<size_t, size_t> start_end_pos
- = adjustFileReadStartAndEndPos(file_info.start(),
file_info.start() + file_info.length(), uri_path, file_uri.getPath());
+ = adjustFileReadStartAndEndPos(file_info.start(),
file_info.start() + file_info.length(), uri_path, file_uri.getPath());
+
LOG_DEBUG(
&Poco::Logger::get("ReadBufferBuilder"),
"File read start and end position adjusted from {},{} to
{},{}",
@@ -232,34 +233,57 @@ public:
start_end_pos.first,
start_end_pos.second);
- auto read_buffer_impl = std::make_unique<DB::ReadBufferFromHDFS>(
- uri_path, file_uri.getPath(), context->getConfigRef(),
read_settings, start_end_pos.second, true);
- if (config.hdfs_async)
- {
- auto & pool_reader =
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
- read_buffer =
std::make_unique<DB::AsynchronousReadBufferFromHDFS>(pool_reader,
read_settings, std::move(read_buffer_impl));
- }
- else
- read_buffer = std::move(read_buffer_impl);
+ read_begin = start_end_pos.first;
+ read_util_position = start_end_pos.second;
+ }
- if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer
*>(read_buffer.get()))
- if (start_end_pos.first)
- seekable_in->seek(start_end_pos.first, SEEK_SET);
+ size_t file_size = 0;
+ if (file_info.has_properties())
+ file_size = file_info.properties().filesize();
+
+ std::unique_ptr<DB::ReadBuffer> read_buffer;
+
+ if (hdfs_config.hdfs_async)
+ {
+ std::optional<size_t> size = std::nullopt;
+ if (file_size)
+ size = file_size;
+
+ auto read_buffer_impl = std::make_shared<DB::ReadBufferFromHDFS>(
+ uri_path, file_uri.getPath(), config,
read_settings, read_util_position, true, size);
+ auto & pool_reader =
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
+ read_buffer =
std::make_unique<DB::AsynchronousReadBufferFromHDFS>(pool_reader,
read_settings, std::move(read_buffer_impl));
}
else
{
- auto read_buffer_impl
- = std::make_unique<DB::ReadBufferFromHDFS>(uri_path,
file_uri.getPath(), context->getConfigRef(), read_settings, 0, true);
- if (config.hdfs_async)
+ if (!file_size)
{
- read_buffer =
std::make_unique<DB::AsynchronousReadBufferFromHDFS>(
-
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER),
- read_settings,
- std::move(read_buffer_impl));
+ // only for spark3.2 file partition not contained file size
+ // so first compute file size first
+ auto read_buffer_impl =
std::make_unique<DB::ReadBufferFromHDFS>(
+ uri_path, file_uri.getPath(), config, read_settings,
read_util_position, true);
+ file_size = read_buffer_impl->getFileSize();
}
- else
- read_buffer = std::move(read_buffer_impl);
+
+ ReadBufferCreator hdfs_read_buffer_creator
+ = [this, hdfs_uri = uri_path, hdfs_file_path =
file_uri.getPath(), read_settings, &config, read_util_position](
+ bool /* restricted_seek */, const DB::StoredObject &
object) -> std::unique_ptr<DB::ReadBufferFromHDFS>
+ {
+ return std::make_unique<DB::ReadBufferFromHDFS>(
+ hdfs_uri, hdfs_file_path, config, read_settings,
read_util_position, true, object.bytes_size);
+ };
+
+ DB::StoredObjects
stored_objects{DB::StoredObject{file_uri.getPath().substr(1), "", file_size}};
+ auto cache_hdfs_read =
std::make_unique<DB::ReadBufferFromRemoteFSGather>(
+ std::move(hdfs_read_buffer_creator), stored_objects, "hdfs:",
read_settings, nullptr, /* use_external_buffer */ false);
+ cache_hdfs_read->setReadUntilPosition(read_util_position);
+ read_buffer = std::move(cache_hdfs_read);
}
+
+ if (set_read_util_position && read_begin)
+ if (auto * seekable_in = dynamic_cast<DB::SeekableReadBuffer
*>(read_buffer.get()))
+ seekable_in->seek(read_begin, SEEK_SET);
+
return read_buffer;
}
@@ -367,6 +391,7 @@ public:
result.second = get_next_line_pos(fs.get(), fin, read_end_pos,
hdfs_file_size);
return result;
}
+
private:
DB::ContextPtr context;
};
@@ -382,23 +407,19 @@ public:
explicit S3FileReadBufferBuilder(DB::ContextPtr context_) :
ReadBufferBuilder(context_)
{
auto config = S3Config::loadFromContext(context);
- new_settings = context->getReadSettings();
- new_settings.enable_filesystem_cache = config.s3_local_cache_enabled;
-
- if (new_settings.enable_filesystem_cache)
+ // use gluten cache config is first priority
+ if (!file_cache && config.s3_local_cache_enabled)
{
DB::FileCacheSettings file_cache_settings;
file_cache_settings.max_size = config.s3_local_cache_max_size;
auto cache_base_path = config.s3_local_cache_cache_path;
- if (!fs::exists(cache_base_path))
- fs::create_directories(cache_base_path);
+ if (!std::filesystem::exists(cache_base_path))
+ std::filesystem::create_directories(cache_base_path);
file_cache_settings.base_path = cache_base_path;
file_cache =
DB::FileCacheFactory::instance().getOrCreate("s3_local_cache",
file_cache_settings, "");
file_cache->initialize();
-
- new_settings.remote_fs_cache = file_cache;
}
}
@@ -407,6 +428,7 @@ public:
std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool
set_read_util_position) override
{
+ DB::ReadSettings read_settings = getReadSettings(context);
Poco::URI file_uri(file_info.uri_file());
// file uri looks like: s3a://my-dev-bucket/tpch100/part/0001.parquet
const std::string& bucket = file_uri.getHost();
@@ -416,9 +438,8 @@ public:
size_t object_size = object_info.size;
Int64 object_modified_time = object_info.last_modification_time;
- if (new_settings.enable_filesystem_cache)
+ if (read_settings.enable_filesystem_cache)
{
-
auto file_cache_key = DB::FileCacheKey(key);
auto last_cache_time = files_cache_time_map.get(file_cache_key);
// quick check
@@ -436,7 +457,7 @@ public:
}
auto read_buffer_creator
- = [bucket, client, this](bool restricted_seek, const
DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
+ = [bucket, client, read_settings, this](bool restricted_seek,
const DB::StoredObject & object) -> std::unique_ptr<DB::ReadBufferFromFileBase>
{
return std::make_unique<DB::ReadBufferFromS3>(
client,
@@ -444,7 +465,7 @@ public:
object.remote_path,
"",
DB::S3::RequestSettings(),
- new_settings,
+ read_settings,
/* use_external_buffer */ true,
/* offset */ 0,
/* read_until_position */0,
@@ -453,11 +474,11 @@ public:
DB::StoredObjects stored_objects{DB::StoredObject{key, "",
object_size}};
auto s3_impl = std::make_unique<DB::ReadBufferFromRemoteFSGather>(
- std::move(read_buffer_creator), stored_objects, "s3:" + bucket +
"/", new_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
+ std::move(read_buffer_creator), stored_objects, "s3:" + bucket +
"/", read_settings, /* cache_log */ nullptr, /* use_external_buffer */ true);
auto & pool_reader =
context->getThreadPoolReader(DB::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
auto async_reader
- =
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl),
pool_reader, new_settings, nullptr, nullptr);
+ =
std::make_unique<DB::AsynchronousBoundedReadBuffer>(std::move(s3_impl),
pool_reader, read_settings, nullptr, nullptr);
if (set_read_util_position)
{
@@ -478,7 +499,7 @@ public:
async_reader->setReadUntilEnd();
}
- if (new_settings.remote_fs_prefetch)
+ if (read_settings.remote_fs_prefetch)
async_reader->prefetch(Priority{});
return async_reader;
@@ -488,7 +509,6 @@ private:
static const std::string SHARED_CLIENT_KEY;
static ConcurrentLRU<std::string, std::shared_ptr<DB::S3::Client>>
per_bucket_clients;
static FileCacheConcurrentMap files_cache_time_map;
- DB::ReadSettings new_settings;
DB::FileCachePtr file_cache;
std::string & stripQuote(std::string & s)
@@ -732,6 +752,57 @@ void registerReadBufferBuilders()
#endif
}
+ReadBufferBuilder::ReadBufferBuilder(DB::ContextPtr context_) :
context(context_)
+{
+ const auto & config = context->getConfigRef();
+ if (config.getBool("gluten_cache.local.enabled", false))
+ {
+ DB::FileCacheSettings file_cache_settings;
+
+ file_cache_settings.loadFromConfig(config, "gluten_cache.local");
+
+ if (std::filesystem::path(file_cache_settings.base_path).is_relative())
+ file_cache_settings.base_path =
std::filesystem::path(context->getPath()) / "caches" /
file_cache_settings.base_path;
+
+ if (!std::filesystem::exists(file_cache_settings.base_path))
+ std::filesystem::create_directories(file_cache_settings.base_path);
+
+ auto name = config.getString("gluten_cache.local.name");
+ auto * config_prefix = "";
+ file_cache = DB::FileCacheFactory::instance().getOrCreate(name,
file_cache_settings, config_prefix);
+ file_cache->initialize();
+ }
+}
+
+DB::ReadSettings ReadBufferBuilder::getReadSettings(DB::ContextPtr context)
const
+{
+ DB::ReadSettings read_settings = context->getReadSettings();
+ if (file_cache)
+ {
+ read_settings.enable_filesystem_cache = true;
+ read_settings.remote_fs_cache = file_cache;
+ }
+ else
+ {
+ read_settings.enable_filesystem_cache = false;
+ }
+
+ return read_settings;
+}
+
+
+std::unique_ptr<DB::ReadBuffer>
+ReadBufferBuilder::buildWithCompressionWrapper(const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool
set_read_util_position)
+{
+ auto in = build(file_info, set_read_util_position);
+
+ /// Wrap the read buffer with compression method if exists
+ Poco::URI file_uri(file_info.uri_file());
+ DB::CompressionMethod compression =
DB::chooseCompressionMethod(file_uri.getPath(), "auto");
+ return compression != DB::CompressionMethod::None ?
DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) :
std::move(in);
+}
+
+
ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
static ReadBufferBuilderFactory instance;
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
index f5218f0aa..92d8d41c1 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.h
@@ -15,21 +15,21 @@
* limitations under the License.
*/
#pragma once
+
#include <functional>
#include <memory>
#include <IO/ReadBuffer.h>
-#include <Interpreters/Context.h>
-#include <Interpreters/Context_fwd.h>
-#include <boost/core/noncopyable.hpp>
#include <substrait/plan.pb.h>
-#include <Poco/URI.h>
+
namespace local_engine
{
+
class ReadBufferBuilder
{
public:
- explicit ReadBufferBuilder(DB::ContextPtr context_) : context(context_) { }
+ explicit ReadBufferBuilder(DB::ContextPtr context_);
+
virtual ~ReadBufferBuilder() = default;
/// build a new read buffer
@@ -37,19 +37,14 @@ public:
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool
set_read_util_position = false) = 0;
/// build a new read buffer, consider compression method
- std::unique_ptr<DB::ReadBuffer> buildWithCompressionWrapper(const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool
set_read_util_position = false)
- {
- auto in = build(file_info, set_read_util_position);
-
- /// Wrap the read buffer with compression method if exists
- Poco::URI file_uri(file_info.uri_file());
- DB::CompressionMethod compression =
DB::chooseCompressionMethod(file_uri.getPath(), "auto");
- return compression != DB::CompressionMethod::None ?
DB::wrapReadBufferWithCompressionMethod(std::move(in), compression)
- : std::move(in);
- }
+ std::unique_ptr<DB::ReadBuffer> buildWithCompressionWrapper(const
substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool
set_read_util_position = false);
protected:
+ DB::ReadSettings getReadSettings(DB::ContextPtr context) const;
DB::ContextPtr context;
+
+public:
+ DB::FileCachePtr file_cache = nullptr;
};
using ReadBufferBuilderPtr = std::shared_ptr<ReadBufferBuilder>;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 3c3d6d4f8..f27da2f92 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1292,6 +1292,30 @@ JNIEXPORT jobject
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeGe
return local_engine::CacheManager::instance().getCacheStatus(env,
jstring2string(env, id));
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
+
+JNIEXPORT jstring
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheFiles(JNIEnv *
env, jobject, jbyteArray files)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ const auto files_bytes = local_engine::getByteArrayElementsSafe(env,
files);
+ const std::string::size_type files_bytes_size = files_bytes.length();
+ std::string_view files_view = {reinterpret_cast<const char
*>(files_bytes.elems()), files_bytes_size};
+ substrait::ReadRel::LocalFiles local_files =
local_engine::BinaryToMessage<substrait::ReadRel::LocalFiles>(files_view);
+
+ auto jobId =
local_engine::CacheManager::instance().cacheFiles(local_files);
+ return local_engine::charTojstring(env, jobId.c_str());
+ LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
+}
+
+JNIEXPORT void
Java_org_apache_gluten_execution_CHNativeCacheManager_removeFiles(JNIEnv * env,
jobject, jstring file_, jstring cache_name_)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ auto file = jstring2string(env, file_);
+ auto cache_name = jstring2string(env, cache_name_);
+
+ local_engine::CacheManager::removeFiles(file, cache_name);
+ LOCAL_ENGINE_JNI_METHOD_END(env, );
+}
+
#ifdef __cplusplus
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]