This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa9305be2 [spark] Introduce `read.allow.fullScan` (#6786)
2fa9305be2 is described below
commit 2fa9305be25cd135366a94e4a7634c51b1f21a93
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 10 15:34:55 2025 +0800
[spark] Introduce `read.allow.fullScan` (#6786)
---
.../generated/spark_connector_configuration.html | 6 +++++
.../apache/paimon/spark/SparkConnectorOptions.java | 7 ++++++
.../org/apache/paimon/spark/PaimonBaseScan.scala | 25 ++++++++++++++++----
.../paimon/spark/PaimonFormatTableBaseScan.scala | 1 -
.../org/apache/paimon/spark/util/OptionUtils.scala | 4 ++++
.../apache/paimon/spark/sql/PaimonQueryTest.scala | 27 ++++++++++++++++++++++
6 files changed, 65 insertions(+), 5 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 06f5093477..8f06adecf9 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -26,6 +26,12 @@ under the License.
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>read.allow.fullScan</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to allow full scan when reading a partitioned
table.</td>
+ </tr>
<tr>
<td><h5>read.changelog</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index f2c0877bb4..40ad19f44c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -91,4 +91,11 @@ public class SparkConnectorOptions {
.defaultValue(false)
.withDescription(
"Whether to read row in the form of changelog (add
rowkind column in row to represent its change type).");
+
+ public static final ConfigOption<Boolean> READ_ALLOW_FULL_SCAN =
+ key("read.allow.fullScan")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to allow full scan when reading a
partitioned table.");
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index 0509d6ad89..7ae11b7ce3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -18,17 +18,16 @@
package org.apache.paimon.spark
-import org.apache.paimon.{stats, CoreOptions, Snapshot}
import org.apache.paimon.annotation.VisibleForTesting
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.spark.statistics.StatisticsHelper
-import org.apache.paimon.table.{DataTable, InnerTable}
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.stats
+import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{InnerTableScan, Split}
-import org.apache.paimon.table.source.snapshot.TimeTravelUtil
-import PaimonImplicits._
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics,
SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
@@ -88,6 +87,7 @@ abstract class PaimonBaseScan(
}
override def toBatch: Batch = {
+ ensureNoFullScan()
PaimonBatch(lazyInputPartitions, readBuilder,
coreOptions.blobAsDescriptor(), metadataColumns)
}
@@ -121,6 +121,23 @@ abstract class PaimonBaseScan(
paimonMetricsRegistry.buildSparkScanMetrics()
}
+ private def ensureNoFullScan(): Unit = {
+ if (OptionUtils.readAllowFullScan()) {
+ return
+ }
+
+ table match {
+ case t: FileStoreTable if !t.partitionKeys().isEmpty =>
+ val skippedFiles =
paimonMetricsRegistry.buildSparkScanMetrics().collectFirst {
+ case m: PaimonSkippedTableFilesTaskMetric => m.value
+ }
+ if (skippedFiles.contains(0)) {
+ throw new RuntimeException("Full scan is not supported.")
+ }
+ case _ =>
+ }
+ }
+
override def description(): String = {
val pushedFiltersStr = if (filters.nonEmpty) {
", PushedFilters: [" + filters.mkString(",") + "]"
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 06ea0998e2..a0dd474436 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -18,7 +18,6 @@
package org.apache.paimon.spark
-import org.apache.paimon.CoreOptions
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.table.FormatTable
import org.apache.paimon.table.source.Split
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index d2dbdfd64d..3a09919f85 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -96,6 +96,10 @@ object OptionUtils extends SQLConfHelper {
getOptionString(SparkCatalogOptions.V1FUNCTION_ENABLED).toBoolean
}
+ def readAllowFullScan(): Boolean = {
+ getOptionString(SparkConnectorOptions.READ_ALLOW_FULL_SCAN).toBoolean
+ }
+
private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String,
String] = {
val mergedOptions = new JHashMap[String, String](
conf.getAllConfs
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index d9df8981b3..5d0864cfe5 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -401,6 +401,33 @@ class PaimonQueryTest extends PaimonSparkTestBase {
.contains("Only append table or deletion vector table support querying
metadata columns."))
}
+ test("Paimon Query: disallow full scan") {
+ withTable("t", "t_p") {
+ sql("CREATE TABLE t (a INT)")
+ sql("INSERT INTO t VALUES (1), (2)")
+ withSparkSQLConf("spark.paimon.read.allow.fullScan" -> "false") {
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(1), Row(2)))
+ }
+
+ sql("CREATE TABLE t_p (a INT, p INT) PARTITIONED BY (p)")
+ sql("INSERT INTO t_p VALUES (1, 1), (2, 2)")
+ withSparkSQLConf("spark.paimon.read.allow.fullScan" -> "false") {
+ assert(
+ intercept[Exception](sql("SELECT * FROM t_p").collect()).getMessage
+ .contains("Full scan is not supported."))
+ assert(
+ intercept[Exception](sql("SELECT * FROM t_p WHERE p >
0").collect()).getMessage
+ .contains("Full scan is not supported."))
+ checkAnswer(sql("SELECT * FROM t_p WHERE p > 1"), Seq(Row(2, 2)))
+
+ checkAnswer(sql("SELECT sys.max_pt('t_p')"), Seq(Row("2")))
+ assert(
+ intercept[Exception](sql("SELECT sys.max_pt('t_p') FROM
t_p").collect()).getMessage
+ .contains("Full scan is not supported."))
+ }
+ }
+ }
+
private def getAllFiles(
tableName: String,
partitions: Seq[String],