This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d099a7941 [spark] Fix analyze compatibility with spark3.3- (#2835)
d099a7941 is described below

commit d099a7941141e97e0f0e122587eed4f21c8946fd
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Feb 2 15:52:41 2024 +0800

    [spark] Fix analyze compatibility with spark3.3- (#2835)
---
 .../scala/org/apache/spark/sql/StatsUtils.scala    | 70 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/StatsUtils.scala    | 71 ++++++++++++++++++++++
 .../apache/paimon/spark/sql/AnalyzeTableTest.scala | 23 +++++++
 .../apache/paimon/spark/sql/AnalyzeTableTest.scala | 41 +------------
 .../apache/paimon/spark/sql/AnalyzeTableTest.scala | 43 +------------
 .../apache/paimon/spark/sql/AnalyzeTableTest.scala | 43 +------------
 .../commands/PaimonAnalyzeTableColumnCommand.scala | 14 +++--
 .../spark/sql/{Utils.scala => StatsUtils.scala}    | 48 +--------------
 .../main/scala/org/apache/spark/sql/Utils.scala    | 47 +-------------
 .../paimon/spark/sql/AnalyzeTableTestBase.scala    | 43 +++++++++++++
 10 files changed, 221 insertions(+), 222 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/StatsUtils.scala
 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/StatsUtils.scala
new file mode 100644
index 000000000..8734e7563
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/spark/sql/StatsUtils.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType, 
TimestampType}
+
+import java.net.URI
+
+object StatsUtils {
+
+  def calculateTotalSize(
+      sessionState: SessionState,
+      tableName: String,
+      locationUri: Option[URI]): Long = {
+    CommandUtils.calculateSingleLocationSize(
+      sessionState,
+      new TableIdentifier(tableName),
+      locationUri)
+  }
+
+  def computeColumnStats(
+      sparkSession: SparkSession,
+      relation: LogicalPlan,
+      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
+    CommandUtils.computeColumnStats(sparkSession, relation, columns)
+  }
+
+  /** DatetimeType was added after spark33, overwrite it for compatibility. */
+  def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
+    case _: IntegralType => true
+    case _: DecimalType => true
+    case DoubleType | FloatType => true
+    case BooleanType => true
+    case DateType => true
+    case TimestampType => true
+    case BinaryType | StringType => true
+    case _ => false
+  }
+
+  /** DatetimeType was added after spark33, overwrite it for compatibility. */
+  def hasMinMax(dataType: DataType): Boolean = dataType match {
+    case _: IntegralType => true
+    case _: DecimalType => true
+    case DoubleType | FloatType => true
+    case BooleanType => true
+    case DateType => true
+    case TimestampType => true
+    case _ => false
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/StatsUtils.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/StatsUtils.scala
new file mode 100644
index 000000000..44aaa6b3b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/StatsUtils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DateType, DecimalType, DoubleType, FloatType, IntegralType, StringType, 
TimestampNTZType, TimestampType}
+
+import java.net.URI
+
+object StatsUtils {
+  def calculateTotalSize(
+      sessionState: SessionState,
+      tableName: String,
+      locationUri: Option[URI]): Long = {
+    CommandUtils.calculateSingleLocationSize(
+      sessionState,
+      new TableIdentifier(tableName),
+      locationUri)
+  }
+
+  def computeColumnStats(
+      sparkSession: SparkSession,
+      relation: LogicalPlan,
+      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
+    CommandUtils.computeColumnStats(sparkSession, relation, columns)
+  }
+
+  /** DatetimeType was added after spark33, overwrite it for compatibility. */
+  def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
+    case _: IntegralType => true
+    case _: DecimalType => true
+    case DoubleType | FloatType => true
+    case BooleanType => true
+    case DateType => true
+    case TimestampType => true
+    case TimestampNTZType => true
+    case BinaryType | StringType => true
+    case _ => false
+  }
+
+  /** DatetimeType was added after spark33, overwrite it for compatibility. */
+  def hasMinMax(dataType: DataType): Boolean = dataType match {
+    case _: IntegralType => true
+    case _: DecimalType => true
+    case DoubleType | FloatType => true
+    case BooleanType => true
+    case DateType => true
+    case TimestampType => true
+    case TimestampNTZType => true
+    case _ => false
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
new file mode 100644
index 000000000..2b5f10297
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+class AnalyzeTableTest extends AnalyzeTableTestBase {
+
+  override protected def supportsColStats(): Boolean = false
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
index ff65336a1..2b5f10297 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -17,46 +17,7 @@
  */
 package org.apache.paimon.spark.sql
 
-import org.junit.jupiter.api.Assertions
-
 class AnalyzeTableTest extends AnalyzeTableTestBase {
 
-  test("Paimon analyze: spark use col stats") {
-    spark.sql(s"""
-                 |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
-                 |USING PAIMON
-                 |TBLPROPERTIES ('primary-key'='id')
-                 |""".stripMargin)
-
-    spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
-    spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
-    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
-    val stats = getScanStatistic("SELECT * FROM T")
-    Assertions.assertEquals(2L, stats.rowCount.get.longValue())
-    // spark 33' v2 table not support col stats
-    Assertions.assertEquals(0, stats.attributeStats.size)
-  }
-
-  test("Paimon analyze: partition filter push down hit") {
-    spark.sql(s"""
-                 |CREATE TABLE T (id INT, name STRING, pt INT)
-                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
-                 |PARTITIONED BY (pt)
-                 |""".stripMargin)
-
-    spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 
'd', 3)")
-    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
-    // partition push down hit
-    var sql = "SELECT * FROM T WHERE pt < 1"
-    // spark 33' v2 table not support col stats
-    Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
-    checkAnswer(spark.sql(sql), Nil)
-
-    // partition push down not hit
-    sql = "SELECT * FROM T WHERE id < 1"
-    Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
-    checkAnswer(spark.sql(sql), Nil)
-  }
+  override protected def supportsColStats(): Boolean = false
 }
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
index 723b8082b..c6f788eab 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -17,45 +17,4 @@
  */
 package org.apache.paimon.spark.sql
 
-import org.junit.jupiter.api.Assertions
-
-class AnalyzeTableTest extends AnalyzeTableTestBase {
-
-  test("Paimon analyze: spark use col stats") {
-    spark.sql(s"""
-                 |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
-                 |USING PAIMON
-                 |TBLPROPERTIES ('primary-key'='id')
-                 |""".stripMargin)
-
-    spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
-    spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
-    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
-    val stats = getScanStatistic("SELECT * FROM T")
-    Assertions.assertEquals(2L, stats.rowCount.get.longValue())
-    Assertions.assertEquals(4, stats.attributeStats.size)
-  }
-
-  test("Paimon analyze: partition filter push down hit") {
-    spark.sql(s"""
-                 |CREATE TABLE T (id INT, name STRING, pt INT)
-                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
-                 |PARTITIONED BY (pt)
-                 |""".stripMargin)
-
-    spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 
'd', 3)")
-    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
-    // paimon will reserve partition filter and not return it to spark, we 
need to ensure stats are filtered correctly.
-    // partition push down hit
-    var sql = "SELECT * FROM T WHERE pt < 1"
-    Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue())
-    checkAnswer(spark.sql(sql), Nil)
-
-    // partition push down not hit
-    sql = "SELECT * FROM T WHERE id < 1"
-    Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
-    checkAnswer(spark.sql(sql), Nil)
-  }
-}
+class AnalyzeTableTest extends AnalyzeTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
index 723b8082b..c6f788eab 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTest.scala
@@ -17,45 +17,4 @@
  */
 package org.apache.paimon.spark.sql
 
-import org.junit.jupiter.api.Assertions
-
-class AnalyzeTableTest extends AnalyzeTableTestBase {
-
-  test("Paimon analyze: spark use col stats") {
-    spark.sql(s"""
-                 |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
-                 |USING PAIMON
-                 |TBLPROPERTIES ('primary-key'='id')
-                 |""".stripMargin)
-
-    spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
-    spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
-    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
-    val stats = getScanStatistic("SELECT * FROM T")
-    Assertions.assertEquals(2L, stats.rowCount.get.longValue())
-    Assertions.assertEquals(4, stats.attributeStats.size)
-  }
-
-  test("Paimon analyze: partition filter push down hit") {
-    spark.sql(s"""
-                 |CREATE TABLE T (id INT, name STRING, pt INT)
-                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
-                 |PARTITIONED BY (pt)
-                 |""".stripMargin)
-
-    spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 
'd', 3)")
-    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
-
-    // paimon will reserve partition filter and not return it to spark, we 
need to ensure stats are filtered correctly.
-    // partition push down hit
-    var sql = "SELECT * FROM T WHERE pt < 1"
-    Assertions.assertEquals(0L, getScanStatistic(sql).rowCount.get.longValue())
-    checkAnswer(spark.sql(sql), Nil)
-
-    // partition push down not hit
-    sql = "SELECT * FROM T WHERE id < 1"
-    Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
-    checkAnswer(spark.sql(sql), Nil)
-  }
-}
+class AnalyzeTableTest extends AnalyzeTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index e9355a5cb..a6576e807 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -25,7 +25,7 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.BatchWriteBuilder
 
 import org.apache.parquet.Preconditions
-import org.apache.spark.sql.{Row, SparkSession, Utils}
+import org.apache.spark.sql.{Row, SparkSession, StatsUtils}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -56,11 +56,12 @@ case class PaimonAnalyzeTableColumnCommand(
 
     // compute stats
     val attributes = getColumnsToAnalyze(relation, columnNames, allColumns)
-    val totalSize = Utils.calculateTotalSize(
+    val totalSize = StatsUtils.calculateTotalSize(
       sparkSession.sessionState,
       table.name(),
       Some(table.location().toUri))
-    val (mergedRecordCount, colStats) = Utils.computeColumnStats(sparkSession, 
relation, attributes)
+    val (mergedRecordCount, colStats) =
+      StatsUtils.computeColumnStats(sparkSession, relation, attributes)
 
     val totalRecordCount = currentSnapshot.totalRecordCount()
     Preconditions.checkState(
@@ -111,7 +112,7 @@ case class PaimonAnalyzeTableColumnCommand(
     }
     columnsToAnalyze.foreach {
       attr =>
-        if (!Utils.analyzeSupportsType(attr.dataType)) {
+        if (!StatsUtils.analyzeSupportsType(attr.dataType)) {
           throw new UnsupportedOperationException(
             s"Analyzing on col: ${attr.name}, data type: ${attr.dataType} is 
not supported.")
         }
@@ -146,11 +147,12 @@ case class PaimonAnalyzeTableColumnCommand(
   }
 
   /**
-   * Convert data from spark type to paimon, only cover datatype meet 
[[Utils.hasMinMax]] currently.
+   * Convert data from spark type to paimon, only cover datatype meet 
[[StatsUtils.hasMinMax]]
+   * currently.
    */
   private def toPaimonData(o: Any, dataType: DataType): Any = {
     dataType match {
-      case d if !Utils.hasMinMax(d) =>
+      case d if !StatsUtils.hasMinMax(d) =>
         // should not reach here
         throw new UnsupportedOperationException(s"Unsupported data type $d, 
value is $o.")
       case _: DecimalType =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/StatsUtils.scala
similarity index 57%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/StatsUtils.scala
index 5ff1de73c..d70309a83 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/StatsUtils.scala
@@ -18,16 +18,11 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
-import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.command.CommandUtils
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.internal.SessionState
-import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
-import org.apache.spark.util.{Utils => SparkUtils}
 
 import java.net.URI
 
@@ -36,47 +31,8 @@ import java.net.URI
  * [[org.apache.spark.sql]] package, Hence, use this class to adapt then so 
that we can use them
  * indirectly.
  */
-object Utils {
+object StatsUtils {
 
-  /**
-   * In the streaming write case, An "Queries with streaming sources must be 
executed with
-   * writeStream.start()" error will occur if we transform [[DataFrame]] first 
and then use it.
-   *
-   * That's because the new [[DataFrame]] has a streaming source that is not 
supported, see the
-   * detail: SPARK-14473. So we can create a new [[DataFrame]] using the 
origin, planned
-   * [[org.apache.spark.sql.execution.SparkPlan]].
-   *
-   * By the way, the origin [[DataFrame]] has been planned by
-   * [[org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy]] 
before call
-   * [[org.apache.spark.sql.execution.streaming.Sink.addBatch]].
-   */
-  def createNewDataFrame(data: DataFrame): DataFrame = {
-    data.sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, 
data.schema)
-  }
-
-  def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan): 
Dataset[Row] = {
-    Dataset.ofRows(sparkSession, logicalPlan)
-  }
-
-  def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): 
Seq[Expression] = {
-    DataSourceStrategy.normalizeExprs(exprs, attributes)
-  }
-
-  def translateFilter(
-      predicate: Expression,
-      supportNestedPredicatePushdown: Boolean): Option[Filter] = {
-    DataSourceStrategy.translateFilter(predicate, 
supportNestedPredicatePushdown)
-  }
-
-  def fieldReference(name: String): NamedReference = {
-    FieldReference.column(name)
-  }
-
-  def bytesToString(size: Long): String = {
-    SparkUtils.bytesToString(size)
-  }
-
-  // for analyze
   def calculateTotalSize(
       sessionState: SessionState,
       tableName: String,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
index 5ff1de73c..e8f1b418a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
@@ -17,20 +17,13 @@
  */
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
NamedReference}
-import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{BinaryType, BooleanType, DataType, 
DatetimeType, DecimalType, DoubleType, FloatType, IntegralType, StringType}
 import org.apache.spark.util.{Utils => SparkUtils}
 
-import java.net.URI
-
 /**
  * Some classes or methods defined in the spark project are marked as private 
under
  * [[org.apache.spark.sql]] package, Hence, use this class to adapt then so 
that we can use them
@@ -75,42 +68,4 @@ object Utils {
   def bytesToString(size: Long): String = {
     SparkUtils.bytesToString(size)
   }
-
-  // for analyze
-  def calculateTotalSize(
-      sessionState: SessionState,
-      tableName: String,
-      locationUri: Option[URI]): Long = {
-    CommandUtils.calculateSingleLocationSize(
-      sessionState,
-      new TableIdentifier(tableName),
-      locationUri)
-  }
-
-  def computeColumnStats(
-      sparkSession: SparkSession,
-      relation: LogicalPlan,
-      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
-    CommandUtils.computeColumnStats(sparkSession, relation, columns)
-  }
-
-  /** [[IntegralType]] is private in spark, therefore we need add it here. */
-  def analyzeSupportsType(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case BooleanType => true
-    case _: DatetimeType => true
-    case BinaryType | StringType => true
-    case _ => false
-  }
-
-  def hasMinMax(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case BooleanType => true
-    case _: DatetimeType => true
-    case _ => false
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
index 5656fa60e..1b221f9ef 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala
@@ -298,6 +298,46 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
     Assertions.assertEquals(2L, stats.rowCount.get.longValue())
   }
 
+  test("Paimon analyze: spark use col stats") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id STRING, name STRING, i INT, l LONG)
+                 |USING PAIMON
+                 |TBLPROPERTIES ('primary-key'='id')
+                 |""".stripMargin)
+
+    spark.sql(s"INSERT INTO T VALUES ('1', 'a', 1, 1)")
+    spark.sql(s"INSERT INTO T VALUES ('2', 'aaa', 1, 2)")
+    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
+
+    val stats = getScanStatistic("SELECT * FROM T")
+    Assertions.assertEquals(2L, stats.rowCount.get.longValue())
+    Assertions.assertEquals(if (supportsColStats()) 4 else 0, 
stats.attributeStats.size)
+  }
+
+  test("Paimon analyze: partition filter push down hit") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, pt INT)
+                 |TBLPROPERTIES ('primary-key'='id, pt', 'bucket'='2')
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', 1), (2, 'b', 1), (3, 'c', 2), (4, 
'd', 3)")
+    spark.sql(s"ANALYZE TABLE T COMPUTE STATISTICS FOR ALL COLUMNS")
+
+    // paimon will reserve partition filter and not return it to spark, we 
need to ensure stats are filtered correctly.
+    // partition push down hit
+    var sql = "SELECT * FROM T WHERE pt < 1"
+    Assertions.assertEquals(
+      if (supportsColStats()) 0L else 4L,
+      getScanStatistic(sql).rowCount.get.longValue())
+    checkAnswer(spark.sql(sql), Nil)
+
+    // partition push down not hit
+    sql = "SELECT * FROM T WHERE id < 1"
+    Assertions.assertEquals(4L, getScanStatistic(sql).rowCount.get.longValue())
+    checkAnswer(spark.sql(sql), Nil)
+  }
+
   protected def statsFileCount(tableLocation: Path, fileIO: FileIO): Int = {
     fileIO.listStatus(new Path(tableLocation, "statistics")).length
   }
@@ -311,4 +351,7 @@ abstract class AnalyzeTableTestBase extends 
PaimonSparkTestBase {
       .get
     relation.computeStats()
   }
+
+  /** Spark supports the use of col stats for v2 table since 3.4+. */
+  protected def supportsColStats(): Boolean = true
 }

Reply via email to