This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa9305be2 [spark] Introduce `read.allow.fullScan` (#6786)
2fa9305be2 is described below

commit 2fa9305be25cd135366a94e4a7634c51b1f21a93
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 10 15:34:55 2025 +0800

    [spark] Introduce `read.allow.fullScan` (#6786)
---
 .../generated/spark_connector_configuration.html   |  6 +++++
 .../apache/paimon/spark/SparkConnectorOptions.java |  7 ++++++
 .../org/apache/paimon/spark/PaimonBaseScan.scala   | 25 ++++++++++++++++----
 .../paimon/spark/PaimonFormatTableBaseScan.scala   |  1 -
 .../org/apache/paimon/spark/util/OptionUtils.scala |  4 ++++
 .../apache/paimon/spark/sql/PaimonQueryTest.scala  | 27 ++++++++++++++++++++++
 6 files changed, 65 insertions(+), 5 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 06f5093477..8f06adecf9 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -26,6 +26,12 @@ under the License.
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>read.allow.fullScan</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to allow full scan when reading a partitioned 
table.</td>
+        </tr>
         <tr>
             <td><h5>read.changelog</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index f2c0877bb4..40ad19f44c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -91,4 +91,11 @@ public class SparkConnectorOptions {
                     .defaultValue(false)
                     .withDescription(
                             "Whether to read row in the form of changelog (add 
rowkind column in row to represent its change type).");
+
+    public static final ConfigOption<Boolean> READ_ALLOW_FULL_SCAN =
+            key("read.allow.fullScan")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to allow full scan when reading a 
partitioned table.");
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 0509d6ad89..7ae11b7ce3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -18,17 +18,16 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.{stats, CoreOptions, Snapshot}
 import org.apache.paimon.annotation.VisibleForTesting
 import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.spark.sources.PaimonMicroBatchStream
 import org.apache.paimon.spark.statistics.StatisticsHelper
-import org.apache.paimon.table.{DataTable, InnerTable}
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.stats
+import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{InnerTableScan, Split}
-import org.apache.paimon.table.source.snapshot.TimeTravelUtil
 
-import PaimonImplicits._
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, 
SupportsReportStatistics}
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
@@ -88,6 +87,7 @@ abstract class PaimonBaseScan(
   }
 
   override def toBatch: Batch = {
+    ensureNoFullScan()
     PaimonBatch(lazyInputPartitions, readBuilder, 
coreOptions.blobAsDescriptor(), metadataColumns)
   }
 
@@ -121,6 +121,23 @@ abstract class PaimonBaseScan(
     paimonMetricsRegistry.buildSparkScanMetrics()
   }
 
+  private def ensureNoFullScan(): Unit = {
+    if (OptionUtils.readAllowFullScan()) {
+      return
+    }
+
+    table match {
+      case t: FileStoreTable if !t.partitionKeys().isEmpty =>
+        val skippedFiles = 
paimonMetricsRegistry.buildSparkScanMetrics().collectFirst {
+          case m: PaimonSkippedTableFilesTaskMetric => m.value
+        }
+        if (skippedFiles.contains(0)) {
+          throw new RuntimeException("Full scan is not supported.")
+        }
+      case _ =>
+    }
+  }
+
   override def description(): String = {
     val pushedFiltersStr = if (filters.nonEmpty) {
       ", PushedFilters: [" + filters.mkString(",") + "]"
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 06ea0998e2..a0dd474436 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.CoreOptions
 import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.table.FormatTable
 import org.apache.paimon.table.source.Split
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index d2dbdfd64d..3a09919f85 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -96,6 +96,10 @@ object OptionUtils extends SQLConfHelper {
     getOptionString(SparkCatalogOptions.V1FUNCTION_ENABLED).toBoolean
   }
 
+  def readAllowFullScan(): Boolean = {
+    getOptionString(SparkConnectorOptions.READ_ALLOW_FULL_SCAN).toBoolean
+  }
+
   private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, 
String] = {
     val mergedOptions = new JHashMap[String, String](
       conf.getAllConfs
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index d9df8981b3..5d0864cfe5 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -401,6 +401,33 @@ class PaimonQueryTest extends PaimonSparkTestBase {
         .contains("Only append table or deletion vector table support querying 
metadata columns."))
   }
 
+  test("Paimon Query: disallow full scan") {
+    withTable("t", "t_p") {
+      sql("CREATE TABLE t (a INT)")
+      sql("INSERT INTO t VALUES (1), (2)")
+      withSparkSQLConf("spark.paimon.read.allow.fullScan" -> "false") {
+        checkAnswer(sql("SELECT * FROM t"), Seq(Row(1), Row(2)))
+      }
+
+      sql("CREATE TABLE t_p (a INT, p INT) PARTITIONED BY (p)")
+      sql("INSERT INTO t_p VALUES (1, 1), (2, 2)")
+      withSparkSQLConf("spark.paimon.read.allow.fullScan" -> "false") {
+        assert(
+          intercept[Exception](sql("SELECT * FROM t_p").collect()).getMessage
+            .contains("Full scan is not supported."))
+        assert(
+          intercept[Exception](sql("SELECT * FROM t_p WHERE p > 
0").collect()).getMessage
+            .contains("Full scan is not supported."))
+        checkAnswer(sql("SELECT * FROM t_p WHERE p > 1"), Seq(Row(2, 2)))
+
+        checkAnswer(sql("SELECT sys.max_pt('t_p')"), Seq(Row("2")))
+        assert(
+          intercept[Exception](sql("SELECT sys.max_pt('t_p') FROM 
t_p").collect()).getMessage
+            .contains("Full scan is not supported."))
+      }
+    }
+  }
+
   private def getAllFiles(
       tableName: String,
       partitions: Seq[String],

Reply via email to