This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch fix/sort-merge-reservation-starvation
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit dbf7781bd88b3bb13dd0fed8a6926cf73090bd3d
Author: xudong.w <[email protected]>
AuthorDate: Tue Jan 27 15:37:46 2026 +0800

    Cherry pick limit pruning from upstream (#29)
    
    Co-authored-by: Yongting You <[email protected]>
    Co-authored-by: Martin Grigorov <[email protected]>
---
 datafusion/core/tests/parquet/mod.rs               |  88 ++++++---
 datafusion/core/tests/parquet/row_group_pruning.rs |  93 +++++++--
 datafusion/datasource-parquet/src/metrics.rs       |  25 +--
 datafusion/datasource-parquet/src/opener.rs        |  31 ++-
 .../datasource-parquet/src/row_group_filter.rs     | 207 ++++++++++++++++-----
 datafusion/datasource-parquet/src/source.rs        |   2 +-
 datafusion/datasource/src/file_scan_config.rs      |  41 +++-
 datafusion/datasource/src/source.rs                |  17 ++
 .../optimizer/src/optimize_projections/mod.rs      |  13 +-
 .../physical-optimizer/src/enforce_sorting/mod.rs  |  10 +-
 .../physical-optimizer/src/limit_pushdown.rs       |  27 ++-
 .../physical-plan/src/coalesce_partitions.rs       |  13 ++
 datafusion/physical-plan/src/execution_plan.rs     |  13 ++
 datafusion/physical-plan/src/filter.rs             |  13 ++
 datafusion/physical-plan/src/limit.rs              |  29 +++
 datafusion/physical-plan/src/metrics/value.rs      |  27 ++-
 datafusion/physical-plan/src/projection.rs         |  13 ++
 .../src/sorts/sort_preserving_merge.rs             |  13 ++
 datafusion/sqllogictest/README.md                  |  11 ++
 datafusion/sqllogictest/src/util.rs                |  47 +++++
 .../sqllogictest/test_files/limit_pruning.slt      |  78 ++++++++
 .../sqllogictest/test_files/slt_features.slt       |  74 ++++++++
 docs/source/user-guide/explain-usage.md            |   1 +
 23 files changed, 749 insertions(+), 137 deletions(-)

diff --git a/datafusion/core/tests/parquet/mod.rs 
b/datafusion/core/tests/parquet/mod.rs
index 8caeda901b..85e1185395 100644
--- a/datafusion/core/tests/parquet/mod.rs
+++ b/datafusion/core/tests/parquet/mod.rs
@@ -30,6 +30,7 @@ use arrow::{
     record_batch::RecordBatch,
     util::pretty::pretty_format_batches,
 };
+use arrow_schema::SchemaRef;
 use chrono::{Datelike, Duration, TimeDelta};
 use datafusion::{
     datasource::{provider_as_source, TableProvider},
@@ -109,6 +110,26 @@ struct ContextWithParquet {
     ctx: SessionContext,
 }
 
+struct PruningMetric {
+    total_pruned: usize,
+    total_matched: usize,
+    total_fully_matched: usize,
+}
+
+impl PruningMetric {
+    pub fn total_pruned(&self) -> usize {
+        self.total_pruned
+    }
+
+    pub fn total_matched(&self) -> usize {
+        self.total_matched
+    }
+
+    pub fn total_fully_matched(&self) -> usize {
+        self.total_fully_matched
+    }
+}
+
 /// The output of running one of the test cases
 struct TestOutput {
     /// The input query SQL
@@ -126,8 +147,8 @@ struct TestOutput {
 impl TestOutput {
     /// retrieve the value of the named metric, if any
     fn metric_value(&self, metric_name: &str) -> Option<usize> {
-        if let Some((pruned, _matched)) = self.pruning_metric(metric_name) {
-            return Some(pruned);
+        if let Some(pm) = self.pruning_metric(metric_name) {
+            return Some(pm.total_pruned());
         }
 
         self.parquet_metrics
@@ -140,9 +161,10 @@ impl TestOutput {
             })
     }
 
-    fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> {
+    fn pruning_metric(&self, metric_name: &str) -> Option<PruningMetric> {
         let mut total_pruned = 0;
         let mut total_matched = 0;
+        let mut total_fully_matched = 0;
         let mut found = false;
 
         for metric in self.parquet_metrics.iter() {
@@ -154,13 +176,19 @@ impl TestOutput {
                 {
                     total_pruned += pruning_metrics.pruned();
                     total_matched += pruning_metrics.matched();
+                    total_fully_matched += pruning_metrics.fully_matched();
+
                     found = true;
                 }
             }
         }
 
         if found {
-            Some((total_pruned, total_matched))
+            Some(PruningMetric {
+                total_pruned,
+                total_matched,
+                total_fully_matched,
+            })
         } else {
             None
         }
@@ -172,39 +200,33 @@ impl TestOutput {
     }
 
     /// The number of row_groups pruned / matched by bloom filter
-    fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> {
+    fn row_groups_bloom_filter(&self) -> Option<PruningMetric> {
         self.pruning_metric("row_groups_pruned_bloom_filter")
     }
 
     /// The number of row_groups matched by statistics
     fn row_groups_matched_statistics(&self) -> Option<usize> {
         self.pruning_metric("row_groups_pruned_statistics")
-            .map(|(_pruned, matched)| matched)
+            .map(|pm| pm.total_matched())
     }
 
-    /*
     /// The number of row_groups fully matched by statistics
     fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
-        self.metric_value("row_groups_fully_matched_statistics")
-    }
-
-    /// The number of row groups pruned by limit pruning
-    fn limit_pruned_row_groups(&self) -> Option<usize> {
-        self.metric_value("limit_pruned_row_groups")
+        self.pruning_metric("row_groups_pruned_statistics")
+            .map(|pm| pm.total_fully_matched())
     }
-    */
 
     /// The number of row_groups pruned by statistics
     fn row_groups_pruned_statistics(&self) -> Option<usize> {
         self.pruning_metric("row_groups_pruned_statistics")
-            .map(|(pruned, _matched)| pruned)
+            .map(|pm| pm.total_pruned())
     }
 
     /// Metric `files_ranges_pruned_statistics` tracks both pruned and matched 
count,
     /// for testing purpose, here it only aggregate the `pruned` count.
     fn files_ranges_pruned_statistics(&self) -> Option<usize> {
         self.pruning_metric("files_ranges_pruned_statistics")
-            .map(|(pruned, _matched)| pruned)
+            .map(|pm| pm.total_pruned())
     }
 
     /// The number of row_groups matched by bloom filter or statistics
@@ -213,14 +235,13 @@ impl TestOutput {
     /// filter: 7 total -> 3 matched, this function returns 3 for the final 
matched
     /// count.
     fn row_groups_matched(&self) -> Option<usize> {
-        self.row_groups_bloom_filter()
-            .map(|(_pruned, matched)| matched)
+        self.row_groups_bloom_filter().map(|pm| pm.total_matched())
     }
 
     /// The number of row_groups pruned
     fn row_groups_pruned(&self) -> Option<usize> {
         self.row_groups_bloom_filter()
-            .map(|(pruned, _matched)| pruned)
+            .map(|pm| pm.total_pruned())
             .zip(self.row_groups_pruned_statistics())
             .map(|(a, b)| a + b)
     }
@@ -228,7 +249,13 @@ impl TestOutput {
     /// The number of row pages pruned
     fn row_pages_pruned(&self) -> Option<usize> {
         self.pruning_metric("page_index_rows_pruned")
-            .map(|(pruned, _matched)| pruned)
+            .map(|pm| pm.total_pruned())
+    }
+
+    /// The number of row groups pruned by limit pruning
+    fn limit_pruned_row_groups(&self) -> Option<usize> {
+        self.pruning_metric("limit_pruned_row_groups")
+            .map(|pm| pm.total_pruned())
     }
 
     fn description(&self) -> String {
@@ -247,6 +274,23 @@ impl ContextWithParquet {
         Self::with_config(scenario, unit, SessionConfig::new(), None, 
None).await
     }
 
+    /// Set custom schema and batches for the test
+    pub async fn with_custom_data(
+        scenario: Scenario,
+        unit: Unit,
+        schema: Arc<Schema>,
+        batches: Vec<RecordBatch>,
+    ) -> Self {
+        Self::with_config(
+            scenario,
+            unit,
+            SessionConfig::new(),
+            Some(schema),
+            Some(batches),
+        )
+        .await
+    }
+
     // Set custom schema and batches for the test
     /*
     pub async fn with_custom_data(
@@ -270,7 +314,7 @@ impl ContextWithParquet {
         scenario: Scenario,
         unit: Unit,
         mut config: SessionConfig,
-        custom_schema: Option<Arc<Schema>>,
+        custom_schema: Option<SchemaRef>,
         custom_batches: Option<Vec<RecordBatch>>,
     ) -> Self {
         // Use a single partition for deterministic results no matter how many 
CPUs the host has
@@ -1109,7 +1153,7 @@ fn create_data_batch(scenario: Scenario) -> 
Vec<RecordBatch> {
 async fn make_test_file_rg(
     scenario: Scenario,
     row_per_group: usize,
-    custom_schema: Option<Arc<Schema>>,
+    custom_schema: Option<SchemaRef>,
     custom_batches: Option<Vec<RecordBatch>>,
 ) -> NamedTempFile {
     let mut output_file = tempfile::Builder::new()
diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs 
b/datafusion/core/tests/parquet/row_group_pruning.rs
index c5e2a4c917..360d212f69 100644
--- a/datafusion/core/tests/parquet/row_group_pruning.rs
+++ b/datafusion/core/tests/parquet/row_group_pruning.rs
@@ -18,8 +18,12 @@
 //! This file contains an end to end test of parquet pruning. It writes
 //! data into a parquet file and then verifies row groups are pruned as
 //! expected.
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch};
+use arrow_schema::{DataType, Field, Schema};
 use datafusion::prelude::SessionConfig;
-use datafusion_common::ScalarValue;
+use datafusion_common::{DataFusionError, ScalarValue};
 use itertools::Itertools;
 
 use crate::parquet::Unit::RowGroup;
@@ -30,12 +34,12 @@ struct RowGroupPruningTest {
     query: String,
     expected_errors: Option<usize>,
     expected_row_group_matched_by_statistics: Option<usize>,
-    // expected_row_group_fully_matched_by_statistics: Option<usize>,
+    expected_row_group_fully_matched_by_statistics: Option<usize>,
     expected_row_group_pruned_by_statistics: Option<usize>,
     expected_files_pruned_by_statistics: Option<usize>,
     expected_row_group_matched_by_bloom_filter: Option<usize>,
     expected_row_group_pruned_by_bloom_filter: Option<usize>,
-    // expected_limit_pruned_row_groups: Option<usize>,
+    expected_limit_pruned_row_groups: Option<usize>,
     expected_rows: usize,
 }
 impl RowGroupPruningTest {
@@ -47,11 +51,11 @@ impl RowGroupPruningTest {
             expected_errors: None,
             expected_row_group_matched_by_statistics: None,
             expected_row_group_pruned_by_statistics: None,
-            // expected_row_group_fully_matched_by_statistics: None,
+            expected_row_group_fully_matched_by_statistics: None,
             expected_files_pruned_by_statistics: None,
             expected_row_group_matched_by_bloom_filter: None,
             expected_row_group_pruned_by_bloom_filter: None,
-            // expected_limit_pruned_row_groups: None,
+            expected_limit_pruned_row_groups: None,
             expected_rows: 0,
         }
     }
@@ -81,7 +85,6 @@ impl RowGroupPruningTest {
     }
 
     // Set the expected fully matched row groups by statistics
-    /*
     fn with_fully_matched_by_stats(
         mut self,
         fully_matched_by_stats: Option<usize>,
@@ -90,12 +93,6 @@ impl RowGroupPruningTest {
         self
     }
 
-    fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) 
-> Self {
-        self.expected_limit_pruned_row_groups = pruned_by_limit;
-        self
-    }
-    */
-
     // Set the expected pruned row groups by statistics
     fn with_pruned_by_stats(mut self, pruned_by_stats: Option<usize>) -> Self {
         self.expected_row_group_pruned_by_statistics = pruned_by_stats;
@@ -119,6 +116,11 @@ impl RowGroupPruningTest {
         self
     }
 
+    fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) 
-> Self {
+        self.expected_limit_pruned_row_groups = pruned_by_limit;
+        self
+    }
+
     /// Set the number of expected rows from the output of this test
     fn with_expected_rows(mut self, rows: usize) -> Self {
         self.expected_rows = rows;
@@ -155,12 +157,12 @@ impl RowGroupPruningTest {
         );
         let bloom_filter_metrics = output.row_groups_bloom_filter();
         assert_eq!(
-            bloom_filter_metrics.map(|(_pruned, matched)| matched),
+            bloom_filter_metrics.as_ref().map(|pm| pm.total_matched()),
             self.expected_row_group_matched_by_bloom_filter,
             "mismatched row_groups_matched_bloom_filter",
         );
         assert_eq!(
-            bloom_filter_metrics.map(|(pruned, _matched)| pruned),
+            bloom_filter_metrics.map(|pm| pm.total_pruned()),
             self.expected_row_group_pruned_by_bloom_filter,
             "mismatched row_groups_pruned_bloom_filter",
         );
@@ -175,6 +177,64 @@ impl RowGroupPruningTest {
         );
     }
 
+    // Execute the test with the current configuration
+    async fn test_row_group_prune_with_custom_data(
+        self,
+        schema: Arc<Schema>,
+        batches: Vec<RecordBatch>,
+        max_row_per_group: usize,
+    ) {
+        let output = ContextWithParquet::with_custom_data(
+            self.scenario,
+            RowGroup(max_row_per_group),
+            schema,
+            batches,
+        )
+        .await
+        .query(&self.query)
+        .await;
+
+        println!("{}", output.description());
+        assert_eq!(
+            output.predicate_evaluation_errors(),
+            self.expected_errors,
+            "mismatched predicate_evaluation error"
+        );
+        assert_eq!(
+            output.row_groups_matched_statistics(),
+            self.expected_row_group_matched_by_statistics,
+            "mismatched row_groups_matched_statistics",
+        );
+        assert_eq!(
+            output.row_groups_fully_matched_statistics(),
+            self.expected_row_group_fully_matched_by_statistics,
+            "mismatched row_groups_fully_matched_statistics",
+        );
+        assert_eq!(
+            output.row_groups_pruned_statistics(),
+            self.expected_row_group_pruned_by_statistics,
+            "mismatched row_groups_pruned_statistics",
+        );
+        assert_eq!(
+            output.files_ranges_pruned_statistics(),
+            self.expected_files_pruned_by_statistics,
+            "mismatched files_ranges_pruned_statistics",
+        );
+        assert_eq!(
+            output.limit_pruned_row_groups(),
+            self.expected_limit_pruned_row_groups,
+            "mismatched limit_pruned_row_groups",
+        );
+        assert_eq!(
+            output.result_rows,
+            self.expected_rows,
+            "Expected {} rows, got {}: {}",
+            output.result_rows,
+            self.expected_rows,
+            output.description(),
+        );
+    }
+
     // Execute the test with the current configuration
     /*
     async fn test_row_group_prune_with_custom_data(
@@ -1723,7 +1783,6 @@ async fn test_bloom_filter_decimal_dict() {
         .await;
 }
 
-/*
 // Helper function to create a batch with a single Int32 column.
 fn make_i32_batch(
     name: &str,
@@ -1950,7 +2009,7 @@ async fn test_limit_pruning_exceeds_fully_matched() -> 
datafusion_common::error:
         .with_scenario(Scenario::Int)
         .with_query(query)
         .with_expected_errors(Some(0))
-        .with_expected_rows(10) // Total: 1 + 3 + 4 + 1 = 9 (less than limit)
+        .with_expected_rows(10) // Total: 1 + 4 + 4 + 1 = 10
         .with_pruned_files(Some(0))
         .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched
         .with_fully_matched_by_stats(Some(2))
@@ -1958,7 +2017,5 @@ async fn test_limit_pruning_exceeds_fully_matched() -> 
datafusion_common::error:
         .with_limit_pruned_row_groups(Some(0)) // No limit pruning since we 
need all RGs
         .test_row_group_prune_with_custom_data(schema, batches, 4)
         .await;
-
     Ok(())
 }
-*/
diff --git a/datafusion/datasource-parquet/src/metrics.rs 
b/datafusion/datasource-parquet/src/metrics.rs
index c45d234f3b..db0673651f 100644
--- a/datafusion/datasource-parquet/src/metrics.rs
+++ b/datafusion/datasource-parquet/src/metrics.rs
@@ -44,18 +44,14 @@ pub struct ParquetFileMetrics {
     pub files_ranges_pruned_statistics: PruningMetrics,
     /// Number of times the predicate could not be evaluated
     pub predicate_evaluation_errors: Count,
-    /// Number of row groups whose bloom filters were checked, tracked with 
matched/pruned counts
+    /// Number of row groups pruned by bloom filters
     pub row_groups_pruned_bloom_filter: PruningMetrics,
-    /// Number of row groups whose statistics were checked, tracked with 
matched/pruned counts
+    /// Number of row groups pruned due to limit pruning.
+    pub limit_pruned_row_groups: PruningMetrics,
+    /// Number of row groups pruned by statistics
     pub row_groups_pruned_statistics: PruningMetrics,
     /// Number of row groups whose bloom filters were checked and matched (not 
pruned)
     pub row_groups_matched_bloom_filter: Count,
-    /// Number of row groups pruned due to limit pruning.
-    pub limit_pruned_row_groups: Count,
-    /// Number of row groups whose statistics were checked and fully matched
-    pub row_groups_fully_matched_statistics: Count,
-    /// Number of row groups whose statistics were checked and matched (not 
pruned)
-    pub row_groups_matched_statistics: Count,
     /// Total number of bytes scanned
     pub bytes_scanned: Count,
     /// Total rows filtered out by predicates pushed into parquet scan
@@ -104,15 +100,8 @@ impl ParquetFileMetrics {
 
         let limit_pruned_row_groups = MetricBuilder::new(metrics)
             .with_new_label("filename", filename.to_string())
-            .counter("limit_pruned_row_groups", partition);
-
-        let row_groups_fully_matched_statistics = MetricBuilder::new(metrics)
-            .with_new_label("filename", filename.to_string())
-            .counter("row_groups_fully_matched_statistics", partition);
-
-        let row_groups_matched_statistics = MetricBuilder::new(metrics)
-            .with_new_label("filename", filename.to_string())
-            .counter("row_groups_matched_statistics", partition);
+            .with_type(MetricType::SUMMARY)
+            .pruning_metrics("limit_pruned_row_groups", partition);
 
         let row_groups_pruned_statistics = MetricBuilder::new(metrics)
             .with_new_label("filename", filename.to_string())
@@ -179,8 +168,6 @@ impl ParquetFileMetrics {
             predicate_evaluation_errors,
             row_groups_matched_bloom_filter,
             row_groups_pruned_bloom_filter,
-            row_groups_fully_matched_statistics,
-            row_groups_matched_statistics,
             row_groups_pruned_statistics,
             limit_pruned_row_groups,
             bytes_scanned,
diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index 5701b22ccb..a3cb112058 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -61,13 +61,15 @@ use parquet::file::metadata::{PageIndexPolicy, 
ParquetMetaDataReader};
 /// Implements [`FileOpener`] for a parquet file
 pub(super) struct ParquetOpener {
     /// Execution partition index
-    pub partition_index: usize,
+    pub(crate) partition_index: usize,
     /// Column indexes in `table_schema` needed by the query
     pub projection: Arc<[usize]>,
     /// Target number of rows in each output RecordBatch
     pub batch_size: usize,
     /// Optional limit on the number of rows to read
-    pub limit: Option<usize>,
+    pub(crate) limit: Option<usize>,
+    /// If should keep the output rows in order
+    pub preserve_order: bool,
     /// Optional predicate to apply during the scan
     pub predicate: Option<Arc<dyn PhysicalExpr>>,
     /// Schema of the output table without partition columns.
@@ -99,8 +101,6 @@ pub(super) struct ParquetOpener {
     pub enable_row_group_stats_pruning: bool,
     /// Coerce INT96 timestamps to specific TimeUnit
     pub coerce_int96: Option<TimeUnit>,
-    /// Should limit pruning be applied
-    pub enable_limit_pruning: bool,
     /// Optional parquet FileDecryptionProperties
     #[cfg(feature = "parquet_encryption")]
     pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
@@ -153,7 +153,6 @@ impl FileOpener for ParquetOpener {
         let enable_bloom_filter = self.enable_bloom_filter;
         let enable_row_group_stats_pruning = 
self.enable_row_group_stats_pruning;
         let limit = self.limit;
-        let enable_limit_pruning = self.enable_limit_pruning;
 
         let predicate_creation_errors = MetricBuilder::new(&self.metrics)
             .global_counter("num_predicate_creation_errors");
@@ -165,6 +164,7 @@ impl FileOpener for ParquetOpener {
         #[cfg(feature = "parquet_encryption")]
         let encryption_context = self.get_encryption_context();
         let max_predicate_cache_size = self.max_predicate_cache_size;
+        let preserve_order = self.preserve_order;
 
         Ok(Box::pin(async move {
             #[cfg(feature = "parquet_encryption")]
@@ -410,14 +410,13 @@ impl FileOpener for ParquetOpener {
                     .add_matched(n_remaining_row_groups);
             }
 
-            // Prune by limit
-            if enable_limit_pruning {
-                if let Some(limit) = limit {
-                    row_groups.prune_by_limit(limit, rg_metadata, 
&file_metrics);
-                }
+            // Prune by limit if limit is set and limit order is not sensitive
+            if let (Some(limit), false) = (limit, preserve_order) {
+                row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
             }
 
             let mut access_plan = row_groups.build();
+
             // page index pruning: if all data on individual pages can
             // be ruled using page metadata, rows from other columns
             // with that range can be skipped as well
@@ -886,6 +885,7 @@ mod test {
                 projection: Arc::new([0, 1]),
                 batch_size: 1024,
                 limit: None,
+                preserve_order: false,
                 predicate: Some(predicate),
                 logical_file_schema: schema.clone(),
                 metadata_size_hint: None,
@@ -898,7 +898,6 @@ mod test {
                 reorder_filters: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                enable_limit_pruning: false,
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
@@ -956,6 +955,7 @@ mod test {
                 projection: Arc::new([0]),
                 batch_size: 1024,
                 limit: None,
+                preserve_order: false,
                 predicate: Some(predicate),
                 logical_file_schema: file_schema.clone(),
                 metadata_size_hint: None,
@@ -972,7 +972,6 @@ mod test {
                 reorder_filters: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                enable_limit_pruning: false,
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
@@ -1046,6 +1045,7 @@ mod test {
                 projection: Arc::new([0]),
                 batch_size: 1024,
                 limit: None,
+                preserve_order: false,
                 predicate: Some(predicate),
                 logical_file_schema: file_schema.clone(),
                 metadata_size_hint: None,
@@ -1062,7 +1062,6 @@ mod test {
                 reorder_filters: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                enable_limit_pruning: false,
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
@@ -1139,6 +1138,7 @@ mod test {
                 projection: Arc::new([0]),
                 batch_size: 1024,
                 limit: None,
+                preserve_order: false,
                 predicate: Some(predicate),
                 logical_file_schema: file_schema.clone(),
                 metadata_size_hint: None,
@@ -1155,7 +1155,6 @@ mod test {
                 reorder_filters: true,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                enable_limit_pruning: false,
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: false, // note that this is 
false!
                 coerce_int96: None,
@@ -1232,6 +1231,7 @@ mod test {
                 projection: Arc::new([0]),
                 batch_size: 1024,
                 limit: None,
+                preserve_order: false,
                 predicate: Some(predicate),
                 logical_file_schema: file_schema.clone(),
                 metadata_size_hint: None,
@@ -1248,7 +1248,6 @@ mod test {
                 reorder_filters: false,
                 enable_page_index: false,
                 enable_bloom_filter: false,
-                enable_limit_pruning: false,
                 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
                 enable_row_group_stats_pruning: true,
                 coerce_int96: None,
@@ -1387,6 +1386,7 @@ mod test {
             projection: Arc::new([0, 1]),
             batch_size: 1024,
             limit: None,
+            preserve_order: false,
             predicate: Some(predicate),
             logical_file_schema: Arc::clone(&table_schema),
             metadata_size_hint: None,
@@ -1399,7 +1399,6 @@ mod test {
             reorder_filters: false,
             enable_page_index: false,
             enable_bloom_filter: false,
-            enable_limit_pruning: false,
             schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
             enable_row_group_stats_pruning: false,
             coerce_int96: None,
diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs 
b/datafusion/datasource-parquet/src/row_group_filter.rs
index f0d483ba35..6004da8ae6 100644
--- a/datafusion/datasource-parquet/src/row_group_filter.rs
+++ b/datafusion/datasource-parquet/src/row_group_filter.rs
@@ -48,7 +48,8 @@ use parquet::{
 pub struct RowGroupAccessPlanFilter {
     /// which row groups should be accessed
     access_plan: ParquetAccessPlan,
-    /// which row groups are fully contained within the pruning predicate
+    /// Row groups where ALL rows are known to match the pruning predicate
+    /// (the predicate does not filter any rows)
     is_fully_matched: Vec<bool>,
 }
 
@@ -84,6 +85,93 @@ impl RowGroupAccessPlanFilter {
     }
 
     /// Prunes the access plan based on the limit and fully contained row 
groups.
+    ///
+    /// The pruning works by leveraging the concept of fully matched row 
groups. Consider a query like:
+    /// `WHERE species LIKE 'Alpine%' AND s >= 50 LIMIT N`
+    ///
+    /// After initial filtering, row groups can be classified into three 
states:
+    ///
+    /// 1. Not Matching / Pruned
+    /// 2. Partially Matching (Row Group/Page contains some matches)
+    /// 3. Fully Matching (Entire range is within predicate)
+    ///
+    /// 
+-----------------------------------------------------------------------+
+    /// |                            NOT MATCHING                              
 |
+    /// |  Row group 1                                                         
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// |  | SPECIES                           | S                           | 
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// |  | Snow Vole                         | 7                           | 
 |
+    /// |  | Brown Bear                        | 133 ✅                      |  
|
+    /// |  | Gray Wolf                         | 82  ✅                      |  
|
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// 
+-----------------------------------------------------------------------+
+    ///
+    /// 
+---------------------------------------------------------------------------+
+    /// |                          PARTIALLY MATCHING                          
     |
+    /// |                                                                      
     |
+    /// |  Row group 2                              Row group 4                
     |
+    /// |  +------------------+--------------+      
+------------------+----------+ |
+    /// |  | SPECIES          | S            |      | SPECIES          | S     
   | |
+    /// |  +------------------+--------------+      
+------------------+----------+ |
+    /// |  | Lynx             | 71 ✅        |      | Europ. Mole      | 4      
  | |
+    /// |  | Red Fox          | 40           |      | Polecat          | 16    
   | |
+    /// |  | Alpine Bat  ✅   | 6            |      | Alpine Ibex ✅  | 97 ✅    
| |
+    /// |  +------------------+--------------+      
+------------------+----------+ |
+    /// 
+---------------------------------------------------------------------------+
+    ///
+    /// 
+-----------------------------------------------------------------------+
+    /// |                           FULLY MATCHING                             
 |
+    /// |  Row group 3                                                         
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// |  | SPECIES                           | S                           | 
 |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// |  | Alpine Ibex  ✅                  | 101    ✅                   |  |
+    /// |  | Alpine Goat  ✅                  | 76     ✅                   |  |
+    /// |  | Alpine Sheep ✅                  | 83     ✅                   |  |
+    /// |  +-----------------------------------+-----------------------------+ 
 |
+    /// 
+-----------------------------------------------------------------------+
+    ///
+    /// ### Identification of Fully Matching Row Groups
+    ///
+    /// DataFusion identifies row groups where ALL rows satisfy the filter by 
inverting the
+    /// predicate and checking if statistics prove the inverted version is 
false for the group.
+    ///
+    /// For example, prefix matches like `species LIKE 'Alpine%'` are pruned 
using ranges:
+    /// 1. Candidate Range: `species >= 'Alpine' AND species < 'Alpinf'`
+    /// 2. Inverted Condition (to prove full match): `species < 'Alpine' OR 
species >= 'Alpinf'`
+    /// 3. Statistical Evaluation (check if any row *could* satisfy the 
inverted condition):
+    ///    `min < 'Alpine' OR max >= 'Alpinf'`
+    ///
+    /// If this evaluation is **false**, it proves no row can fail the 
original filter,
+    /// so the row group is **FULLY MATCHING**.
+    ///
+    /// ### Impact of Statistics Truncation
+    ///
+    /// The precision of pruning depends on the metadata quality. Truncated 
statistics
+    /// may prevent the system from proving a full match.
+    ///
+    /// **Example**: `WHERE species LIKE 'Alpine%'` (Target range: `['Alpine', 
'Alpinf')`)
+    ///
+    /// | Truncation Length | min / max           | Inverted Evaluation        
                                         | Status                 |
+    /// 
|-------------------|---------------------|---------------------------------------------------------------------|------------------------|
+    /// | **Length 6**      | `Alpine` / `Alpine` | `"Alpine" < "Alpine" (F) 
OR "Alpine" >= "Alpinf" (F)` -> **false**  | **FULLY MATCHING**     |
+    /// | **Length 3**      | `Alp` / `Alq`       | `"Alp" < "Alpine" (T) OR 
"Alq" >= "Alpinf" (T)` -> **true**         | **PARTIALLY MATCHING** |
+    ///
+    /// Even though Row Group 3 only contains matching rows, truncation to 
length 3 makes
+    /// the statistics `[Alp, Alq]` too broad to prove it (they could include 
"Alpha").
+    /// The system must conservatively scan the group.
+    ///
+    /// Without limit pruning: Scan Partition 2 → Partition 3 → Partition 4 
(until limit reached)
+    /// With limit pruning: If Partition 3 contains enough rows to satisfy the 
limit,
+    /// skip Partitions 2 and 4 entirely and go directly to Partition 3.
+    ///
+    /// This optimization is particularly effective when:
+    /// - The limit is small relative to the total dataset size
+    /// - There are row groups that are fully matched by the filter predicates
+    /// - The fully matched row groups contain sufficient rows to satisfy the 
limit
+    ///
+    /// For more information, see the 
[paper](https://arxiv.org/pdf/2504.11540)'s "Pruning for LIMIT Queries" part
     pub fn prune_by_limit(
         &mut self,
         limit: usize,
@@ -93,7 +181,8 @@ impl RowGroupAccessPlanFilter {
         let mut fully_matched_row_group_indexes: Vec<usize> = Vec::new();
         let mut fully_matched_rows_count: usize = 0;
 
-        // Iterate through the currently accessible row groups
+        // Iterate through the currently accessible row groups and try to
+        // find a set of matching row groups that can satisfy the limit
         for &idx in self.access_plan.row_group_indexes().iter() {
             if self.is_fully_matched[idx] {
                 let row_group_row_count = rg_metadata[idx].num_rows() as usize;
@@ -105,13 +194,15 @@ impl RowGroupAccessPlanFilter {
             }
         }
 
+        // If we can satisfy the limit with fully matching row groups,
+        // rewrite the plan to do so
         if fully_matched_rows_count >= limit {
             let original_num_accessible_row_groups =
                 self.access_plan.row_group_indexes().len();
             let new_num_accessible_row_groups = 
fully_matched_row_group_indexes.len();
             let pruned_count = original_num_accessible_row_groups
                 .saturating_sub(new_num_accessible_row_groups);
-            metrics.limit_pruned_row_groups.add(pruned_count);
+            metrics.limit_pruned_row_groups.add_pruned(pruned_count);
 
             let mut new_access_plan = 
ParquetAccessPlan::new_none(rg_metadata.len());
             for &idx in &fully_matched_row_group_indexes {
@@ -197,45 +288,15 @@ impl RowGroupAccessPlanFilter {
                     }
                 }
 
-                // Note: this part of code shouldn't be expensive with a 
limited number of row groups
-                // If we do find it's expensive, we can consider optimizing it 
further.
-                if !fully_contained_candidates_original_idx.is_empty() {
-                    // Use NotExpr to create the inverted predicate
-                    let inverted_expr =
-                        
Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
-                    // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 
0)
-                    // before building the pruning predicate
-                    let mut simplifier = 
PhysicalExprSimplifier::new(arrow_schema);
-                    let inverted_expr = 
simplifier.simplify(inverted_expr).unwrap();
-                    if let Ok(inverted_predicate) = PruningPredicate::try_new(
-                        inverted_expr,
-                        Arc::clone(predicate.schema()),
-                    ) {
-                        let inverted_pruning_stats = RowGroupPruningStatistics 
{
-                            parquet_schema,
-                            row_group_metadatas: 
fully_contained_candidates_original_idx
-                                .iter()
-                                .map(|&i| &groups[i])
-                                .collect::<Vec<_>>(),
-                            arrow_schema,
-                        };
-
-                        if let Ok(inverted_values) =
-                            inverted_predicate.prune(&inverted_pruning_stats)
-                        {
-                            for (i, &original_row_group_idx) in
-                                
fully_contained_candidates_original_idx.iter().enumerate()
-                            {
-                                // If the inverted predicate *also* prunes 
this row group (meaning inverted_values[i] is false),
-                                // it implies that *all* rows in this group 
satisfy the original predicate.
-                                if !inverted_values[i] {
-                                    
self.is_fully_matched[original_row_group_idx] = true;
-                                    
metrics.row_groups_fully_matched_statistics.add(1);
-                                }
-                            }
-                        }
-                    }
-                }
+                // Check if any of the matched row groups are fully contained 
by the predicate
+                self.identify_fully_matched_row_groups(
+                    &fully_contained_candidates_original_idx,
+                    arrow_schema,
+                    parquet_schema,
+                    groups,
+                    predicate,
+                    metrics,
+                );
             }
             // stats filter array could not be built, so we can't prune
             Err(e) => {
@@ -245,6 +306,68 @@ impl RowGroupAccessPlanFilter {
         }
     }
 
+    /// Identifies row groups that are fully matched by the predicate.
+    ///
+    /// This optimization checks whether all rows in a row group satisfy the 
predicate
+    /// by inverting the predicate and checking if it prunes the row group. If 
the
+    /// inverted predicate prunes a row group, it means no rows match the 
inverted
+    /// predicate, which implies all rows match the original predicate.
+    ///
+    /// Note: This optimization is relatively inexpensive for a limited number 
of row groups.
+    fn identify_fully_matched_row_groups(
+        &mut self,
+        candidate_row_group_indices: &[usize],
+        arrow_schema: &Schema,
+        parquet_schema: &SchemaDescriptor,
+        groups: &[RowGroupMetaData],
+        predicate: &PruningPredicate,
+        metrics: &ParquetFileMetrics,
+    ) {
+        if candidate_row_group_indices.is_empty() {
+            return;
+        }
+
+        // Use NotExpr to create the inverted predicate
+        let inverted_expr = 
Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr())));
+
+        // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0)
+        // before building the pruning predicate
+        let mut simplifier = PhysicalExprSimplifier::new(arrow_schema);
+        let Ok(inverted_expr) = simplifier.simplify(inverted_expr) else {
+            return;
+        };
+
+        let Ok(inverted_predicate) =
+            PruningPredicate::try_new(inverted_expr, 
Arc::clone(predicate.schema()))
+        else {
+            return;
+        };
+
+        let inverted_pruning_stats = RowGroupPruningStatistics {
+            parquet_schema,
+            row_group_metadatas: candidate_row_group_indices
+                .iter()
+                .map(|&i| &groups[i])
+                .collect::<Vec<_>>(),
+            arrow_schema,
+        };
+
+        let Ok(inverted_values) = 
inverted_predicate.prune(&inverted_pruning_stats)
+        else {
+            return;
+        };
+
+        for (i, &original_row_group_idx) in 
candidate_row_group_indices.iter().enumerate()
+        {
+            // If the inverted predicate *also* prunes this row group (meaning 
inverted_values[i] is false),
+            // it implies that *all* rows in this group satisfy the original 
predicate.
+            if !inverted_values[i] {
+                self.is_fully_matched[original_row_group_idx] = true;
+                metrics.row_groups_pruned_statistics.add_fully_matched(1);
+            }
+        }
+    }
+
     /// Prune remaining row groups using available bloom filters and the
     /// [`PruningPredicate`].
     ///
diff --git a/datafusion/datasource-parquet/src/source.rs 
b/datafusion/datasource-parquet/src/source.rs
index 339d36b57c..ae1cab83ba 100644
--- a/datafusion/datasource-parquet/src/source.rs
+++ b/datafusion/datasource-parquet/src/source.rs
@@ -564,6 +564,7 @@ impl FileSource for ParquetSource {
                 .batch_size
                 .expect("Batch size must set before creating ParquetOpener"),
             limit: base_config.limit,
+            preserve_order: base_config.preserve_order,
             predicate: self.predicate.clone(),
             logical_file_schema: Arc::clone(base_config.file_schema()),
             partition_fields: base_config.table_partition_cols().clone(),
@@ -575,7 +576,6 @@ impl FileSource for ParquetSource {
             enable_page_index: self.enable_page_index(),
             enable_bloom_filter: self.bloom_filter_on_read(),
             enable_row_group_stats_pruning: 
self.table_parquet_options.global.pruning,
-            enable_limit_pruning: base_config.limit_pruning,
             schema_adapter_factory,
             coerce_int96,
             #[cfg(feature = "parquet_encryption")]
diff --git a/datafusion/datasource/src/file_scan_config.rs 
b/datafusion/datasource/src/file_scan_config.rs
index 02d9762a4a..20b4370772 100644
--- a/datafusion/datasource/src/file_scan_config.rs
+++ b/datafusion/datasource/src/file_scan_config.rs
@@ -187,6 +187,11 @@ pub struct FileScanConfig {
     /// The maximum number of records to read from this plan. If `None`,
     /// all records after filtering are returned.
     pub limit: Option<usize>,
+    /// Whether the scan's limit is order sensitive
+    /// When `true`, files must be read in the exact order specified to produce
+    /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
+    /// DataFusion may reorder file processing for optimization without 
affecting correctness.
+    pub preserve_order: bool,
     /// All equivalent lexicographical orderings that describe the schema.
     pub output_ordering: Vec<LexOrdering>,
     /// File compression type
@@ -268,8 +273,9 @@ pub struct FileScanConfigBuilder {
     /// [`DataSourceExec`]: crate::source::DataSourceExec
     table_schema: TableSchema,
     file_source: Arc<dyn FileSource>,
-    limit: Option<usize>,
     projection_indices: Option<Vec<usize>>,
+    limit: Option<usize>,
+    preserve_order: bool,
     constraints: Option<Constraints>,
     file_groups: Vec<FileGroup>,
     statistics: Option<Statistics>,
@@ -305,6 +311,7 @@ impl FileScanConfigBuilder {
             new_lines_in_values: None,
             limit: None,
             projection_indices: None,
+            preserve_order: false,
             constraints: None,
             batch_size: None,
             expr_adapter_factory: None,
@@ -319,6 +326,15 @@ impl FileScanConfigBuilder {
         self
     }
 
+    /// Set whether the limit should be order-sensitive.
+    /// When `true`, files must be read in the exact order specified to produce
+    /// correct results (e.g., for `ORDER BY ... LIMIT` queries). When `false`,
+    /// DataFusion may reorder file processing for optimization without 
affecting correctness.
+    pub fn with_preserve_order(mut self, order_sensitive: bool) -> Self {
+        self.preserve_order = order_sensitive;
+        self
+    }
+
     /// Set the file source for scanning files.
     ///
     /// This method allows you to change the file source implementation (e.g. 
ParquetSource, CsvSource, etc.)
@@ -466,6 +482,7 @@ impl FileScanConfigBuilder {
             file_source,
             limit,
             projection_indices,
+            preserve_order,
             constraints,
             file_groups,
             statistics,
@@ -494,12 +511,16 @@ impl FileScanConfigBuilder {
             ProjectionExprs::from_indices(&indices, 
table_schema.table_schema())
         });
 
+        // If there is an output ordering, we should preserve it.
+        let preserve_order = preserve_order || !output_ordering.is_empty();
+
         FileScanConfig {
             object_store_url,
             table_schema,
             file_source,
             limit,
             projection_exprs,
+            preserve_order,
             constraints,
             file_groups,
             output_ordering,
@@ -514,6 +535,7 @@ impl FileScanConfigBuilder {
 
 impl From<FileScanConfig> for FileScanConfigBuilder {
     fn from(config: FileScanConfig) -> Self {
+        let projection_indices = config.projection_indices();
         Self {
             object_store_url: config.object_store_url,
             table_schema: config.table_schema,
@@ -524,9 +546,8 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
             file_compression_type: Some(config.file_compression_type),
             new_lines_in_values: Some(config.new_lines_in_values),
             limit: config.limit,
-            projection_indices: config
-                .projection_exprs
-                .map(|p| p.ordered_column_indices()),
+            projection_indices: Some(projection_indices),
+            preserve_order: config.preserve_order,
             constraints: Some(config.constraints),
             batch_size: config.batch_size,
             expr_adapter_factory: config.expr_adapter_factory,
@@ -757,6 +778,18 @@ impl DataSource for FileScanConfig {
             }
         }
     }
+
+    fn with_preserve_order(&self, preserve_order: bool) -> Option<Arc<dyn 
DataSource>> {
+        if self.preserve_order == preserve_order {
+            return Some(Arc::new(self.clone()));
+        }
+
+        let new_config = FileScanConfig {
+            preserve_order,
+            ..self.clone()
+        };
+        Some(Arc::new(new_config))
+    }
 }
 
 impl FileScanConfig {
diff --git a/datafusion/datasource/src/source.rs 
b/datafusion/datasource/src/source.rs
index 7169997bd0..9227647bd0 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -189,6 +189,11 @@ pub trait DataSource: Send + Sync + Debug {
             vec![PushedDown::No; filters.len()],
         ))
     }
+
+    /// Returns a variant of this `DataSource` that is aware of 
order-sensitivity.
+    fn with_preserve_order(&self, _preserve_order: bool) -> Option<Arc<dyn 
DataSource>> {
+        None
+    }
 }
 
 /// [`ExecutionPlan`] that reads one or more files
@@ -371,6 +376,18 @@ impl ExecutionPlan for DataSourceExec {
             }),
         }
     }
+
+    fn with_preserve_order(
+        &self,
+        preserve_order: bool,
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        self.data_source
+            .with_preserve_order(preserve_order)
+            .map(|new_data_source| {
+                Arc::new(self.clone().with_data_source(new_data_source))
+                    as Arc<dyn ExecutionPlan>
+            })
+    }
 }
 
 impl DataSourceExec {
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 5db71417bc..59a9d703f6 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -268,15 +268,10 @@ fn optimize_projections(
                 Some(projection) => indices.into_mapped_indices(|idx| 
projection[idx]),
                 None => indices.into_inner(),
             };
-            return TableScan::try_new(
-                table_name,
-                source,
-                Some(projection),
-                filters,
-                fetch,
-            )
-            .map(LogicalPlan::TableScan)
-            .map(Transformed::yes);
+            let new_scan =
+                TableScan::try_new(table_name, source, Some(projection), 
filters, fetch)?;
+
+            return Ok(Transformed::yes(LogicalPlan::TableScan(new_scan)));
         }
         // Other node types are handled below
         _ => {}
diff --git a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs 
b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
index 28d187bbf8..a42abad1dc 100644
--- a/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
+++ b/datafusion/physical-optimizer/src/enforce_sorting/mod.rs
@@ -584,11 +584,17 @@ fn analyze_immediate_sort_removal(
         // Remove the sort:
         node.children = node.children.swap_remove(0).children;
         if let Some(fetch) = sort_exec.fetch() {
+            let required_ordering = 
sort_exec.properties().output_ordering().cloned();
             // If the sort has a fetch, we need to add a limit:
             if properties.output_partitioning().partition_count() == 1 {
-                Arc::new(GlobalLimitExec::new(Arc::clone(sort_input), 0, 
Some(fetch)))
+                let mut global_limit =
+                    GlobalLimitExec::new(Arc::clone(sort_input), 0, 
Some(fetch));
+                global_limit.set_required_ordering(required_ordering);
+                Arc::new(global_limit)
             } else {
-                Arc::new(LocalLimitExec::new(Arc::clone(sort_input), fetch))
+                let mut local_limit = 
LocalLimitExec::new(Arc::clone(sort_input), fetch);
+                local_limit.set_required_ordering(required_ordering);
+                Arc::new(local_limit)
             }
         } else {
             Arc::clone(sort_input)
diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs 
b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 7469c3af93..e0079578a3 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -50,6 +50,7 @@ pub struct GlobalRequirements {
     fetch: Option<usize>,
     skip: usize,
     satisfied: bool,
+    preserve_order: bool,
 }
 
 impl LimitPushdown {
@@ -69,6 +70,7 @@ impl PhysicalOptimizerRule for LimitPushdown {
             fetch: None,
             skip: 0,
             satisfied: false,
+            preserve_order: false,
         };
         pushdown_limits(plan, global_state)
     }
@@ -111,6 +113,13 @@ impl LimitExec {
             Self::Local(_) => 0,
         }
     }
+
+    fn preserve_order(&self) -> bool {
+        match self {
+            Self::Global(global) => global.required_ordering().is_some(),
+            Self::Local(local) => local.required_ordering().is_some(),
+        }
+    }
 }
 
 impl From<LimitExec> for Arc<dyn ExecutionPlan> {
@@ -145,6 +154,7 @@ pub fn pushdown_limit_helper(
         );
         global_state.skip = skip;
         global_state.fetch = fetch;
+        global_state.preserve_order = limit_exec.preserve_order();
 
         // Now the global state has the most recent information, we can remove
         // the `LimitExec` plan. We will decide later if we should add it again
@@ -241,17 +251,28 @@ pub fn pushdown_limit_helper(
         let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
         if global_state.satisfied {
             if let Some(plan_with_fetch) = maybe_fetchable {
-                Ok((Transformed::yes(plan_with_fetch), global_state))
+                let plan_with_preserve_order = plan_with_fetch
+                    .with_preserve_order(global_state.preserve_order)
+                    .unwrap_or(plan_with_fetch);
+                Ok((Transformed::yes(plan_with_preserve_order), global_state))
             } else {
                 Ok((Transformed::no(pushdown_plan), global_state))
             }
         } else {
             global_state.satisfied = true;
             pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
+                let plan_with_preserve_order = plan_with_fetch
+                    .with_preserve_order(global_state.preserve_order)
+                    .unwrap_or(plan_with_fetch);
+
                 if global_skip > 0 {
-                    add_global_limit(plan_with_fetch, global_skip, 
Some(global_fetch))
+                    add_global_limit(
+                        plan_with_preserve_order,
+                        global_skip,
+                        Some(global_fetch),
+                    )
                 } else {
-                    plan_with_fetch
+                    plan_with_preserve_order
                 }
             } else {
                 add_limit(pushdown_plan, global_skip, global_fetch)
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs 
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 31e5a7369c..ebf97cee58 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -284,6 +284,19 @@ impl ExecutionPlan for CoalescePartitionsExec {
         }))
     }
 
+    fn with_preserve_order(
+        &self,
+        preserve_order: bool,
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        self.input
+            .with_preserve_order(preserve_order)
+            .and_then(|new_input| {
+                Arc::new(self.clone())
+                    .with_new_children(vec![new_input])
+                    .ok()
+            })
+    }
+
     fn gather_filters_for_pushdown(
         &self,
         _phase: FilterPushdownPhase,
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 340e5662ce..35866d1cfe 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -688,6 +688,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ) -> Option<Arc<dyn ExecutionPlan>> {
         None
     }
+
+    /// Returns a variant of this `ExecutionPlan` that is aware of 
order-sensitivity.
+    ///
+    /// This is used to signal to data sources that the output ordering must be
+    /// preserved, even if it might be more efficient to ignore it (e.g. by
+    /// skipping some row groups in Parquet).
+    ///
+    fn with_preserve_order(
+        &self,
+        _preserve_order: bool,
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        None
+    }
 }
 
 /// [`ExecutionPlan`] Invariant Level
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 2ee7d283d8..c86dd2ff16 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -570,6 +570,19 @@ impl ExecutionPlan for FilterExec {
             updated_node,
         })
     }
+
+    fn with_preserve_order(
+        &self,
+        preserve_order: bool,
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        self.input
+            .with_preserve_order(preserve_order)
+            .and_then(|new_input| {
+                Arc::new(self.clone())
+                    .with_new_children(vec![new_input])
+                    .ok()
+            })
+    }
 }
 
 impl EmbeddedProjection for FilterExec {
diff --git a/datafusion/physical-plan/src/limit.rs 
b/datafusion/physical-plan/src/limit.rs
index 5bdd697b35..fecb70be32 100644
--- a/datafusion/physical-plan/src/limit.rs
+++ b/datafusion/physical-plan/src/limit.rs
@@ -35,6 +35,7 @@ use arrow::record_batch::RecordBatch;
 use datafusion_common::{internal_err, Result};
 use datafusion_execution::TaskContext;
 
+use datafusion_physical_expr::LexOrdering;
 use futures::stream::{Stream, StreamExt};
 use log::trace;
 
@@ -51,6 +52,9 @@ pub struct GlobalLimitExec {
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     cache: PlanProperties,
+    /// Does the limit have to preserve the order of its input, and if so what 
is it?
+    /// Some optimizations may reorder the input if no particular sort is 
required
+    required_ordering: Option<LexOrdering>,
 }
 
 impl GlobalLimitExec {
@@ -63,6 +67,7 @@ impl GlobalLimitExec {
             fetch,
             metrics: ExecutionPlanMetricsSet::new(),
             cache,
+            required_ordering: None,
         }
     }
 
@@ -91,6 +96,16 @@ impl GlobalLimitExec {
             Boundedness::Bounded,
         )
     }
+
+    /// Get the required ordering from limit
+    pub fn required_ordering(&self) -> &Option<LexOrdering> {
+        &self.required_ordering
+    }
+
+    /// Set the required ordering for limit
+    pub fn set_required_ordering(&mut self, required_ordering: 
Option<LexOrdering>) {
+        self.required_ordering = required_ordering;
+    }
 }
 
 impl DisplayAs for GlobalLimitExec {
@@ -230,6 +245,9 @@ pub struct LocalLimitExec {
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     cache: PlanProperties,
+    /// If the child plan is a sort node, after the sort node is removed during
+    /// physical optimization, we should add the required ordering to the 
limit node
+    required_ordering: Option<LexOrdering>,
 }
 
 impl LocalLimitExec {
@@ -241,6 +259,7 @@ impl LocalLimitExec {
             fetch,
             metrics: ExecutionPlanMetricsSet::new(),
             cache,
+            required_ordering: None,
         }
     }
 
@@ -264,6 +283,16 @@ impl LocalLimitExec {
             Boundedness::Bounded,
         )
     }
+
+    /// Get the required ordering from limit
+    pub fn required_ordering(&self) -> &Option<LexOrdering> {
+        &self.required_ordering
+    }
+
+    /// Set the required ordering for limit
+    pub fn set_required_ordering(&mut self, required_ordering: 
Option<LexOrdering>) {
+        self.required_ordering = required_ordering;
+    }
 }
 
 impl DisplayAs for LocalLimitExec {
diff --git a/datafusion/physical-plan/src/metrics/value.rs 
b/datafusion/physical-plan/src/metrics/value.rs
index 298d63e5e2..b66119b869 100644
--- a/datafusion/physical-plan/src/metrics/value.rs
+++ b/datafusion/physical-plan/src/metrics/value.rs
@@ -372,14 +372,23 @@ impl Drop for ScopedTimerGuard<'_> {
 pub struct PruningMetrics {
     pruned: Arc<AtomicUsize>,
     matched: Arc<AtomicUsize>,
+    fully_matched: Arc<AtomicUsize>,
 }
 
 impl Display for PruningMetrics {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
         let matched = self.matched.load(Ordering::Relaxed);
         let total = self.pruned.load(Ordering::Relaxed) + matched;
+        let fully_matched = self.fully_matched.load(Ordering::Relaxed);
 
-        write!(f, "{total} total → {matched} matched")
+        if fully_matched != 0 {
+            write!(
+                f,
+                "{total} total → {matched} matched -> {fully_matched} fully 
matched",
+            )
+        } else {
+            write!(f, "{total} total → {matched} matched")
+        }
     }
 }
 
@@ -395,6 +404,7 @@ impl PruningMetrics {
         Self {
             pruned: Arc::new(AtomicUsize::new(0)),
             matched: Arc::new(AtomicUsize::new(0)),
+            fully_matched: Arc::new(AtomicUsize::new(0)),
         }
     }
 
@@ -412,6 +422,13 @@ impl PruningMetrics {
         self.matched.fetch_add(n, Ordering::Relaxed);
     }
 
+    /// Add `n` to the metric's fully matched value
+    pub fn add_fully_matched(&self, n: usize) {
+        // relaxed ordering for operations on `value` poses no issues
+        // we're purely using atomic ops with no associated memory ops
+        self.fully_matched.fetch_add(n, Ordering::Relaxed);
+    }
+
     /// Subtract `n` to the metric's matched value.
     pub fn subtract_matched(&self, n: usize) {
         // relaxed ordering for operations on `value` poses no issues
@@ -428,6 +445,11 @@ impl PruningMetrics {
     pub fn matched(&self) -> usize {
         self.matched.load(Ordering::Relaxed)
     }
+
+    /// Number of items fully matched
+    pub fn fully_matched(&self) -> usize {
+        self.fully_matched.load(Ordering::Relaxed)
+    }
 }
 
 /// Counters tracking ratio metrics (e.g. matched vs total)
@@ -842,8 +864,11 @@ impl MetricValue {
             ) => {
                 let pruned = 
other_pruning_metrics.pruned.load(Ordering::Relaxed);
                 let matched = 
other_pruning_metrics.matched.load(Ordering::Relaxed);
+                let fully_matched =
+                    
other_pruning_metrics.fully_matched.load(Ordering::Relaxed);
                 pruning_metrics.add_pruned(pruned);
                 pruning_metrics.add_matched(matched);
+                pruning_metrics.add_fully_matched(fully_matched);
             }
             (
                 Self::Ratio { ratio_metrics, .. },
diff --git a/datafusion/physical-plan/src/projection.rs 
b/datafusion/physical-plan/src/projection.rs
index cfdaa4e9d9..4fe051dbac 100644
--- a/datafusion/physical-plan/src/projection.rs
+++ b/datafusion/physical-plan/src/projection.rs
@@ -346,6 +346,19 @@ impl ExecutionPlan for ProjectionExec {
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
         Ok(FilterPushdownPropagation::if_all(child_pushdown_result))
     }
+
+    fn with_preserve_order(
+        &self,
+        preserve_order: bool,
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        self.input
+            .with_preserve_order(preserve_order)
+            .and_then(|new_input| {
+                Arc::new(self.clone())
+                    .with_new_children(vec![new_input])
+                    .ok()
+            })
+    }
 }
 
 impl ProjectionStream {
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 11f42c8153..150f1e60cb 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -245,6 +245,19 @@ impl ExecutionPlan for SortPreservingMergeExec {
         }))
     }
 
+    fn with_preserve_order(
+        &self,
+        preserve_order: bool,
+    ) -> Option<Arc<dyn ExecutionPlan>> {
+        self.input
+            .with_preserve_order(preserve_order)
+            .and_then(|new_input| {
+                Arc::new(self.clone())
+                    .with_new_children(vec![new_input])
+                    .ok()
+            })
+    }
+
     fn required_input_distribution(&self) -> Vec<Distribution> {
         vec![Distribution::UnspecifiedDistribution]
     }
diff --git a/datafusion/sqllogictest/README.md 
b/datafusion/sqllogictest/README.md
index a389ae1ef6..8768deee3d 100644
--- a/datafusion/sqllogictest/README.md
+++ b/datafusion/sqllogictest/README.md
@@ -142,6 +142,17 @@ select substr('Andrew Lamb', 1, 6), '|'
 Andrew |
 ```
 
+## Cookbook: Ignoring volatile output
+
+Sometimes parts of a result change every run (timestamps, counters, etc.). To 
keep the rest of the snapshot checked in, replace those fragments with the 
`<slt:ignore>` marker inside the expected block. During validation the marker 
acts like a wildcard, so only the surrounding text must match.
+
+```text
+query TT
+EXPLAIN ANALYZE SELECT * FROM generate_series(100);
+----
+Plan with Metrics LazyMemoryExec: partitions=1, 
batch_generators=[generate_series: start=0, end=100, batch_size=8192], 
metrics=[output_rows=101, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>]
+```
+
 # Reference
 
 ## Running tests: Validation Mode
diff --git a/datafusion/sqllogictest/src/util.rs 
b/datafusion/sqllogictest/src/util.rs
index 695fe463fa..721c559ab1 100644
--- a/datafusion/sqllogictest/src/util.rs
+++ b/datafusion/sqllogictest/src/util.rs
@@ -82,6 +82,10 @@ pub fn df_value_validator(
     actual: &[Vec<String>],
     expected: &[String],
 ) -> bool {
+    // Support ignore marker <slt:ignore> to skip volatile parts of output.
+    const IGNORE_MARKER: &str = "<slt:ignore>";
+    let contains_ignore_marker = expected.iter().any(|line| 
line.contains(IGNORE_MARKER));
+
     let normalized_expected = 
expected.iter().map(normalizer).collect::<Vec<_>>();
     let normalized_actual = actual
         .iter()
@@ -89,6 +93,32 @@ pub fn df_value_validator(
         .map(|str| str.trim_end().to_string())
         .collect_vec();
 
+    // If ignore marker present, perform fragment-based matching on the full 
snapshot.
+    if contains_ignore_marker {
+        let expected_snapshot = normalized_expected.join("\n");
+        let actual_snapshot = normalized_actual.join("\n");
+        let fragments: Vec<&str> = 
expected_snapshot.split(IGNORE_MARKER).collect();
+        let mut pos = 0;
+        for (i, frag) in fragments.iter().enumerate() {
+            if frag.is_empty() {
+                continue;
+            }
+            if let Some(idx) = actual_snapshot[pos..].find(frag) {
+                // Edge case: The following example is expected to fail
+                // Actual - 'foo bar baz'
+                // Expected - 'bar <slt:ignore>'
+                if (i == 0) && (idx != 0) {
+                    return false;
+                }
+
+                pos += idx + frag.len();
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
     if log_enabled!(Warn) && normalized_actual != normalized_expected {
         warn!("df validation failed. actual vs expected:");
         for i in 0..normalized_actual.len() {
@@ -110,3 +140,20 @@ pub fn df_value_validator(
 pub fn is_spark_path(relative_path: &Path) -> bool {
     relative_path.starts_with("spark/")
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    // Validation should fail for the below case:
+    // Actual - 'foo bar baz'
+    // Expected - 'bar <slt:ignore>'
+    #[test]
+    fn ignore_marker_does_not_skip_leading_text() {
+        // Actual snapshot contains unexpected prefix before the expected 
fragment.
+        let actual = vec![vec!["foo bar baz".to_string()]];
+        let expected = vec!["bar <slt:ignore>".to_string()];
+
+        assert!(!df_value_validator(value_normalizer, &actual, &expected));
+    }
+}
diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt 
b/datafusion/sqllogictest/test_files/limit_pruning.slt
new file mode 100644
index 0000000000..90ee528ecf
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/limit_pruning.slt
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+statement ok
+set datafusion.execution.parquet.pushdown_filters = true;
+
+
+statement ok
+CREATE TABLE tracking_data AS VALUES
+-- ***** Row Group 0 *****
+  ('Anow Vole', 7),
+  ('Brown Bear', 133),
+  ('Gray Wolf', 82),
+-- ***** Row Group 1 *****
+  ('Lynx', 71),
+  ('Red Fox', 40),
+  ('Alpine Bat', 6),
+-- ***** Row Group 2 *****
+  ('Nlpine Ibex', 101),
+  ('Nlpine Goat', 76),
+  ('Nlpine Sheep', 83),
+-- ***** Row Group 3 *****
+  ('Europ. Mole', 4),
+  ('Polecat', 16),
+  ('Alpine Ibex', 97);
+
+statement ok
+COPY (SELECT column1 as species, column2 as s FROM tracking_data)
+TO 'test_files/scratch/limit_pruning/data.parquet'
+STORED AS PARQUET
+OPTIONS (
+  'format.max_row_group_size' '3'
+);
+
+statement ok
+drop table tracking_data;
+
+statement ok
+CREATE EXTERNAL TABLE tracking_data
+STORED AS PARQUET
+LOCATION 'test_files/scratch/limit_pruning/data.parquet';
+
+
+statement ok
+set datafusion.explain.analyze_level = summary;
+
+# row_groups_pruned_statistics=4 total → 3 matched -> 1 fully matched
+# limit_pruned_row_groups=2 total → 0 matched
+query TT
+explain analyze select * from tracking_data where species > 'M' AND s >= 50 
limit 3;
+----
+Plan with Metrics DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M 
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND 
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, 
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, files_ranges_pruned [...]
+
+# limit_pruned_row_groups=0 total → 0 matched
+# because of order by, scan needs to preserve sort, so limit pruning is 
disabled
+query TT
+explain analyze select * from tracking_data where species > 'M' AND s >= 50 
order by species limit 3;
+----
+Plan with Metrics
+01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], 
preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], 
metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 
50 AND DynamicFilter [ species@0 < Nlpine Sheep ], 
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND 
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != 
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], me [...]
+
+statement ok
+drop table tracking_data;
diff --git a/datafusion/sqllogictest/test_files/slt_features.slt 
b/datafusion/sqllogictest/test_files/slt_features.slt
new file mode 100644
index 0000000000..f3d467ea0d
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/slt_features.slt
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# =================================
+# Test sqllogictest runner features
+# =================================
+
+# --------------------------
+# Test `<slt:ignore>` marker
+# --------------------------
+query T
+select 'DataFusion'
+----
+<slt:ignore>
+
+query T
+select 'DataFusion'
+----
+Data<slt:ignore>
+
+query T
+select 'DataFusion'
+----
+<slt:ignore>Fusion
+
+query T
+select 'Apache DataFusion';
+----
+<slt:ignore>Data<slt:ignore>
+
+query T
+select 'DataFusion'
+----
+DataFusion<slt:ignore>
+
+query T
+select 'DataFusion'
+----
+<slt:ignore>DataFusion
+
+query T
+select 'DataFusion'
+----
+<slt:ignore>DataFusion<slt:ignore>
+
+query I
+select * from generate_series(3);
+----
+0
+1
+<slt:ignore>
+3
+
+query I
+select * from generate_series(3);
+----
+<slt:ignore>
+1
+<slt:ignore>
+<slt:ignore>
\ No newline at end of file
diff --git a/docs/source/user-guide/explain-usage.md 
b/docs/source/user-guide/explain-usage.md
index 5a1184539c..8fe8316381 100644
--- a/docs/source/user-guide/explain-usage.md
+++ b/docs/source/user-guide/explain-usage.md
@@ -228,6 +228,7 @@ When predicate pushdown is enabled, `DataSourceExec` with 
`ParquetSource` gains
 - `page_index_rows_pruned`: number of rows evaluated by page index filters. 
The metric reports both how many rows were considered in total and how many 
matched (were not pruned).
 - `row_groups_pruned_bloom_filter`: number of row groups evaluated by Bloom 
Filters, reporting both total checked groups and groups that matched.
 - `row_groups_pruned_statistics`: number of row groups evaluated by row-group 
statistics (min/max), reporting both total checked groups and groups that 
matched.
+- `limit_pruned_row_groups`: number of row groups pruned by the limit.
 - `pushdown_rows_matched`: rows that were tested by any of the above filters, 
and passed all of them.
 - `pushdown_rows_pruned`: rows that were tested by any of the above filters, 
and did not pass at least one of them.
 - `predicate_evaluation_errors`: number of times evaluating the filter 
expression failed (expected to be zero in normal operation)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to