This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d099a7941 [spark] Fix analyze compatibility with spark3.3- (#2835)
d099a7941 is described below
commit d099a7941141e97e0f0e122587eed4f21c8946fd
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Feb 2 15:52:41 2024 +0800
[spark] Fix analyze compatibility with spark3.3- (#2835)
---
.../scala/org/apache/spark/sql/StatsUtils.scala | 70 +++++++++++++++++++++
.../scala/org/apache/spark/sql/StatsUtils.scala | 71 ++++++++++++++++++++++
.../apache/paimon/spark/sql/AnalyzeTableTest.scala | 23 +++++++
.../apache/paimon/spark/sql/AnalyzeTableTest.scala | 41 +------------
.../apache/paimon/spark/sql/AnalyzeTableTest.scala | 43 +------------
.../apache/paimon/spark/sql/AnalyzeTableTest.scala | 43 +------------
.../commands/PaimonAnalyzeTableColumnCommand.scala | 14 +++--
.../spark/sql/{Utils.scala => StatsUtils.scala} | 48 +--------------
.../main/scala/org/apache/spark/sql/Utils.scala | 47 +-------------
.../paimon/spark/sql/AnalyzeTableTestBase.scala | 43 +++++++++++++
10 files changed, 221 insertions(+), 222 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/StatsUtils.scala
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/StatsUtils.scala
new file mode 100644
index 000000000..8734e7563
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/StatsUtils.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType,
TimestampType}
+
+import java.net.URI
+
+object StatsUtils {
+
+ def calculateTotalSize(
+ sessionState: SessionState,
+ tableName: String,
+ locationUri: Option[URI]): Long = {
+ CommandUtils.calculateSingleLocationSize(
+ sessionState,
+ new TableIdentifier(tableName),
+ locationUri)
+ }
+
+ def computeColumnStats(
+ sparkSession: SparkSession,
+ relation: LogicalPlan,
+ columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
+ CommandUtils.computeColumnStats(sparkSession, relation, columns)
+ }
+
+ /** DatetimeType was added after spark33, overwrite it for compatibility. */
+ def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case BooleanType => true
+ case DateType => true
+ case TimestampType => true
+ case BinaryType | StringType => true
+ case _ => false
+ }
+
+ /** DatetimeType was added after spark33, overwrite it for compatibility. */
+ def hasMinMax(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case BooleanType => true
+ case DateType => true
+ case TimestampType => true
+ case _ => false
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/StatsUtils.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/StatsUtils.scala
new file mode 100644
index 000000000..44aaa6b3b
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/StatsUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType,
TimestampNTZType, TimestampType}
+
+import java.net.URI
+
+object StatsUtils {
+ def calculateTotalSize(
+ sessionState: SessionState,
+ tableName: String,
+ locationUri: Option[URI]): Long = {
+ CommandUtils.calculateSingleLocationSize(
+ sessionState,
+ new TableIdentifier(tableName),
+ locationUri)
+ }
+
+ def computeColumnStats(
+ sparkSession: SparkSession,
+ relation: LogicalPlan,
+ columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
+ CommandUtils.computeColumnStats(sparkSession, relation, columns)
+ }
+
+ /** DatetimeType was added after spark33, overwrite it for compatibility. */
+ def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case BooleanType => true
+ case DateType => true
+ case TimestampType => true
+ case TimestampNTZType => true
+ case BinaryType | StringType => true
+ case _ => false
+ }
+
+ /** DatetimeType was added after spark33, overwrite it for compatibility. */
+ def hasMinMax(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case BooleanType => true
+ case DateType => true
+ case TimestampType => true
+ case TimestampNTZType => true
+ case _ => false
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
new file mode 100644
index 000000000..2b5f10297
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+class AnalyzeTableTest extends AnalyzeTableTestBase {
+
+ override protected def supportsColStats(): Boolean = false
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
index ff65336a1..2b5f10297 100644
---
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -17,46 +17,7 @@
*/
package org.apache.paimon.spark.sql
-import org.junit.jupiter.api.Assertions
-
class AnalyzeTableTest extends AnalyzeTableTestBase {
- test("Paimon analyze: spark use col stats") {
- spark.sql(s"""
- |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
- |USING PAIMON
- |TBLPROPERTIES ('primary-key'='id')
- |""".stripMargin)
-
- spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
- spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
- spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
- val stats = getScanStatistic("SELECT * FROM T")
- Assertions.assertEquals(2L, stats.rowCount.get.longValue())
- // spark 33' v2 table not support col stats
- Assertions.assertEquals(0, stats.attributeStats.size)
- }
-
- test("Paimon analyze: partition filter push down hit") {
- spark.sql(s"""
- |CREATE TABLE T (id INT, name STRING, pt INT)
- |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
- |PARTITIONED BY (pt)
- |""".stripMargin)
-
- spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4,
'd', 3)")
- spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
- // partition push down hit
- var sql = "SELECT * FROM T WHERE pt < 1"
- // spark 33' v2 table not support col stats
- Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
- checkAnswer(spark.sql(sql), Nil)
-
- // partition push down not hit
- sql = "SELECT * FROM T WHERE id < 1"
- Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
- checkAnswer(spark.sql(sql), Nil)
- }
+ override protected def supportsColStats(): Boolean = false
}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
index 723b8082b..c6f788eab 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -17,45 +17,4 @@
*/
package org.apache.paimon.spark.sql
-import org.junit.jupiter.api.Assertions
-
-class AnalyzeTableTest extends AnalyzeTableTestBase {
-
- test("Paimon analyze: spark use col stats") {
- spark.sql(s"""
- |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
- |USING PAIMON
- |TBLPROPERTIES ('primary-key'='id')
- |""".stripMargin)
-
- spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
- spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
- spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
- val stats = getScanStatistic("SELECT * FROM T")
- Assertions.assertEquals(2L, stats.rowCount.get.longValue())
- Assertions.assertEquals(4, stats.attributeStats.size)
- }
-
- test("Paimon analyze: partition filter push down hit") {
- spark.sql(s"""
- |CREATE TABLE T (id INT, name STRING, pt INT)
- |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
- |PARTITIONED BY (pt)
- |""".stripMargin)
-
- spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4,
'd', 3)")
- spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
- // paimon will reserve partition filter and not return it to spark, we
need to ensure stats are filtered correctly.
- // partition push down hit
- var sql = "SELECT * FROM T WHERE pt < 1"
- Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue())
- checkAnswer(spark.sql(sql), Nil)
-
- // partition push down not hit
- sql = "SELECT * FROM T WHERE id < 1"
- Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
- checkAnswer(spark.sql(sql), Nil)
- }
-}
+class AnalyzeTableTest extends AnalyzeTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
index 723b8082b..c6f788eab 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -17,45 +17,4 @@
*/
package org.apache.paimon.spark.sql
-import org.junit.jupiter.api.Assertions
-
-class AnalyzeTableTest extends AnalyzeTableTestBase {
-
- test("Paimon analyze: spark use col stats") {
- spark.sql(s"""
- |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
- |USING PAIMON
- |TBLPROPERTIES ('primary-key'='id')
- |""".stripMargin)
-
- spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
- spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
- spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
- val stats = getScanStatistic("SELECT * FROM T")
- Assertions.assertEquals(2L, stats.rowCount.get.longValue())
- Assertions.assertEquals(4, stats.attributeStats.size)
- }
-
- test("Paimon analyze: partition filter push down hit") {
- spark.sql(s"""
- |CREATE TABLE T (id INT, name STRING, pt INT)
- |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
- |PARTITIONED BY (pt)
- |""".stripMargin)
-
- spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4,
'd', 3)")
- spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
- // paimon will reserve partition filter and not return it to spark, we
need to ensure stats are filtered correctly.
- // partition push down hit
- var sql = "SELECT * FROM T WHERE pt < 1"
- Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue())
- checkAnswer(spark.sql(sql), Nil)
-
- // partition push down not hit
- sql = "SELECT * FROM T WHERE id < 1"
- Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
- checkAnswer(spark.sql(sql), Nil)
- }
-}
+class AnalyzeTableTest extends AnalyzeTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index e9355a5cb..a6576e807 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -25,7 +25,7 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.BatchWriteBuilder
import org.apache.parquet.Preconditions
-import org.apache.spark.sql.{Row, SparkSession, Utils}
+import org.apache.spark.sql.{Row, SparkSession, StatsUtils}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -56,11 +56,12 @@ case class PaimonAnalyzeTableColumnCommand(
// compute stats
val attributes = getColumnsToAnalyze(relation, columnNames, allColumns)
- val totalSize = Utils.calculateTotalSize(
+ val totalSize = StatsUtils.calculateTotalSize(
sparkSession.sessionState,
table.name(),
Some(table.location().toUri))
- val (mergedRecordCount, colStats) = Utils.computeColumnStats(sparkSession,
relation, attributes)
+ val (mergedRecordCount, colStats) =
+ StatsUtils.computeColumnStats(sparkSession, relation, attributes)
val totalRecordCount = currentSnapshot.totalRecordCount()
Preconditions.checkState(
@@ -111,7 +112,7 @@ case class PaimonAnalyzeTableColumnCommand(
}
columnsToAnalyze.foreach {
attr =>
- if (!Utils.analyzeSupportsType(attr.dataType)) {
+ if (!StatsUtils.analyzeSupportsType(attr.dataType)) {
throw new UnsupportedOperationException(
s"Analyzing on col: ${attr.name}, data type: ${attr.dataType} is
not supported.")
}
@@ -146,11 +147,12 @@ case class PaimonAnalyzeTableColumnCommand(
}
/**
- * Convert data from spark type to paimon, only cover datatype meet
[[Utils.hasMinMax]] currently.
+ * Convert data from spark type to paimon, only cover datatype meet
[[StatsUtils.hasMinMax]]
+ * currently.
*/
private def toPaimonData(o: Any, dataType: DataType): Any = {
dataType match {
- case d if !Utils.hasMinMax(d) =>
+ case d if !StatsUtils.hasMinMax(d) =>
// should not reach here
throw new UnsupportedOperationException(s"Unsupported data type $d,
value is $o.")
case _: DecimalType =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/StatsUtils.scala
similarity index 57%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/StatsUtils.scala
index 5ff1de73c..d70309a83 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/StatsUtils.scala
@@ -18,16 +18,11 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
-import org.apache.spark.util.{Utils => SparkUtils}
import java.net.URI
@@ -36,47 +31,8 @@ import java.net.URI
* [[org.apache.spark.sql]] package, Hence, use this class to adapt then so
that we can use them
* indirectly.
*/
-object Utils {
+object StatsUtils {
- /**
- * In the streaming write case, An "Queries with streaming sources must be
executed with
- * writeStream.start()" error will occur if we transform [[DataFrame]] first
and then use it.
- *
- * That's because the new [[DataFrame]] has a streaming source that is not
supported, see the
- * detail: SPARK-14473. So we can create a new [[DataFrame]] using the
origin, planned
- * [[org.apache.spark.sql.execution.SparkPlan]].
- *
- * By the way, the origin [[DataFrame]] has been planned by
- * [[org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy]]
before call
- * [[org.apache.spark.sql.execution.streaming.Sink.addBatch]].
- */
- def createNewDataFrame(data: DataFrame): DataFrame = {
- data.sqlContext.internalCreateDataFrame(data.queryExecution.toRdd,
data.schema)
- }
-
- def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan):
Dataset[Row] = {
- Dataset.ofRows(sparkSession, logicalPlan)
- }
-
- def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]):
Seq[Expression] = {
- DataSourceStrategy.normalizeExprs(exprs, attributes)
- }
-
- def translateFilter(
- predicate: Expression,
- supportNestedPredicatePushdown: Boolean): Option[Filter] = {
- DataSourceStrategy.translateFilter(predicate,
supportNestedPredicatePushdown)
- }
-
- def fieldReference(name: String): NamedReference = {
- FieldReference.column(name)
- }
-
- def bytesToString(size: Long): String = {
- SparkUtils.bytesToString(size)
- }
-
- // for analyze
def calculateTotalSize(
sessionState: SessionState,
tableName: String,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
index 5ff1de73c..e8f1b418a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
@@ -17,20 +17,13 @@
*/
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
-import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.internal.SessionState
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType,
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
import org.apache.spark.util.{Utils => SparkUtils}
-import java.net.URI
-
/**
* Some classes or methods defined in the spark project are marked as private
under
* [[org.apache.spark.sql]] package, Hence, use this class to adapt then so
that we can use them
@@ -75,42 +68,4 @@ object Utils {
def bytesToString(size: Long): String = {
SparkUtils.bytesToString(size)
}
-
- // for analyze
- def calculateTotalSize(
- sessionState: SessionState,
- tableName: String,
- locationUri: Option[URI]): Long = {
- CommandUtils.calculateSingleLocationSize(
- sessionState,
- new TableIdentifier(tableName),
- locationUri)
- }
-
- def computeColumnStats(
- sparkSession: SparkSession,
- relation: LogicalPlan,
- columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
- CommandUtils.computeColumnStats(sparkSession, relation, columns)
- }
-
- /** [[IntegralType]] is private in spark, therefore we need add it here. */
- def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case BooleanType => true
- case _: DatetimeType => true
- case BinaryType | StringType => true
- case _ => false
- }
-
- def hasMinMax(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case BooleanType => true
- case _: DatetimeType => true
- case _ => false
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
index 5656fa60e..1b221f9ef 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
@@ -298,6 +298,46 @@ abstract class AnalyzeTableTestBase extends
PaimonSparkTestBase {
Assertions.assertEquals(2L, stats.rowCount.get.longValue())
}
+ test("Paimon analyze: spark use col stats") {
+ spark.sql(s"""
+ |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
+ |USING PAIMON
+ |TBLPROPERTIES ('primary-key'='id')
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
+ spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
+ spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
+
+ val stats = getScanStatistic("SELECT * FROM T")
+ Assertions.assertEquals(2L, stats.rowCount.get.longValue())
+ Assertions.assertEquals(if (supportsColStats()) 4 else 0,
stats.attributeStats.size)
+ }
+
+ test("Paimon analyze: partition filter push down hit") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, pt INT)
+ |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4,
'd', 3)")
+ spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
+
+ // paimon will reserve partition filter and not return it to spark, we
need to ensure stats are filtered correctly.
+ // partition push down hit
+ var sql = "SELECT * FROM T WHERE pt < 1"
+ Assertions.assertEquals(
+ if (supportsColStats()) 0L else 4L,
+ getScanStatistic(sql).rowCount.get.longValue())
+ checkAnswer(spark.sql(sql), Nil)
+
+ // partition push down not hit
+ sql = "SELECT * FROM T WHERE id < 1"
+ Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
+ checkAnswer(spark.sql(sql), Nil)
+ }
+
protected def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = {
fileIO.listStatus(new Path(tableLocation, "statistics")).length
}
@@ -311,4 +351,7 @@ abstract class AnalyzeTableTestBase extends
PaimonSparkTestBase {
.get
relation.computeStats()
}
+
+ /** Spark supports the use of col stats for v2 table since 3.4+. */
+ protected def supportsColStats(): Boolean = true
}