Copilot commented on code in PR #502:
URL: https://github.com/apache/hudi-rs/pull/502#discussion_r2649847449


##########
crates/core/src/table/mod.rs:
##########
@@ -629,14 +632,40 @@ impl Table {
             .await
     }
 
-    /// Fetch file records from the metadata table.
+    /// Fetch file records from metadata table with optional partition pruning.
     ///
-    /// The metadata table returns records as-of its current state. For time 
travel
-    /// or incremental queries, the timestamp filtering is handled by the 
caller
-    /// using completion time views - the metadata table just provides the 
file listing.
-    async fn fetch_metadata_table_records(&self) -> Result<HashMap<String, 
FilesPartitionRecord>> {
+    /// When a partition_pruner is provided and has filters, this uses targeted
+    /// HFile lookups to read only the needed partitions. Otherwise, falls back
+    /// to reading all partitions.
+    async fn fetch_metadata_table_records_pruned(
+        &self,
+        partition_pruner: &PartitionPruner,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
         let metadata_table = self.new_metadata_table().await?;
-        metadata_table.read_metadata_files().await
+
+        // If no partition filters, read all
+        if partition_pruner.is_empty() {
+            return metadata_table.read_metadata_files().await;
+        }
+
+        // Step 1: Get all partition paths from __all_partitions__
+        let all_partitions = 
metadata_table.read_all_partition_paths_internal().await?;
+
+        // Step 2: Apply partition pruning (CPU only, no I/O)
+        let pruned: Vec<&str> = all_partitions
+            .iter()
+            .filter(|p| partition_pruner.should_include(p))
+            .map(|s| s.as_str())
+            .collect();
+

Review Comment:
   The keys passed to `read_metadata_files_for_keys_internal` must be sorted in 
ascending order because `HFileReader::lookup_records()` requires sorted keys 
for its efficient forward scan algorithm. The current implementation filters 
partitions from `all_partitions` which does not guarantee sorted order. This 
could lead to incorrect behavior or suboptimal performance when partition paths 
are not naturally sorted.
   
   Consider sorting the pruned keys before passing them to the lookup function.
   ```suggestion
           let mut pruned: Vec<&str> = all_partitions
               .iter()
               .filter(|p| partition_pruner.should_include(p))
               .map(|s| s.as_str())
               .collect();
   
           // Ensure keys are sorted in ascending order as required by
           // `read_metadata_files_for_keys_internal` / 
`HFileReader::lookup_records()`.
           pruned.sort();
   ```



##########
crates/core/src/file_group/reader.rs:
##########
@@ -989,4 +1087,26 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_read_metadata_files_for_keys_returns_only_requested() -> 
Result<()> {
+        let reader = create_metadata_table_reader()?;
+
+        // Request only specific keys
+        let keys = vec!["__all_partitions__", "city=chennai"];
+        let result = reader.read_metadata_files_for_keys_blocking(
+            METADATA_TABLE_FILES_BASE_FILE,
+            METADATA_TABLE_FILES_LOG_FILES.to_vec(),
+            &keys,

Review Comment:
   The test only uses sorted keys which doesn't test the actual usage pattern 
where partition pruning may produce unsorted keys. Consider adding a test case 
with unsorted keys to verify the lookup behavior works correctly when 
partitions are not naturally sorted (e.g., `["city=sao_paulo", 
"city=chennai"]`).



##########
crates/core/src/metadata/merger.rs:
##########
@@ -533,4 +585,45 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_merge_for_keys_filters_to_requested_keys() -> crate::Result<()> {
+        // Get base records from the HFile
+        let dir = files_partition_dir();
+        let mut hfiles: Vec<_> = std::fs::read_dir(&dir)
+            .unwrap()
+            .filter_map(|e| e.ok())
+            .filter(|e| {
+                e.path()
+                    .extension()
+                    .map(|ext| ext == "hfile")
+                    .unwrap_or(false)
+            })
+            .collect();
+        hfiles.sort_by_key(|e| e.file_name());
+
+        let hfile_path = hfiles.last().expect("No HFile found").path();
+        let bytes = std::fs::read(&hfile_path).expect("Failed to read HFile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create 
HFileReader");
+        let schema = reader
+            .get_avro_schema()
+            .expect("Failed to get schema")
+            .expect("No schema in HFile")
+            .clone();
+        let base_records = reader.collect_records().expect("Failed to collect 
records");
+
+        let merger = FilesPartitionMerger::new(schema);
+
+        // Request only specific keys
+        let keys = vec!["__all_partitions__", "city=chennai"];
+        let merged = merger.merge_for_keys(&base_records, &[], &keys)?;

Review Comment:
   Similar to the test in file_group/reader.rs, this test only uses sorted 
keys. Consider adding a test case with unsorted keys to ensure the code handles 
real-world partition pruning scenarios where partitions may not be naturally 
sorted.



##########
crates/core/src/table/mod.rs:
##########
@@ -854,6 +883,205 @@ impl Table {
             .block_on(self.read_metadata_files())
     }
 
+    /// Get all partition paths from the metadata table.
+    ///
+    /// This reads only the `__all_partitions__` record from the metadata 
table,
+    /// which contains the list of all partition paths in the table. This is 
much
+    /// cheaper than reading all partition records.
+    ///
+    /// # Returns
+    /// A vector of partition paths (e.g., ["city=chennai", "city=sf"]).
+    /// Returns empty vector for non-partitioned tables.
+    ///
+    /// # Errors
+    /// Returns error if metadata table is not enabled or cannot be read.
+    pub async fn get_all_partition_paths(&self) -> Result<Vec<String>> {
+        if !self.is_metadata_table_enabled() {
+            return Err(CoreError::MetadataTable(
+                "Metadata table is not enabled".to_string(),
+            ));
+        }
+
+        let metadata_table = self.new_metadata_table().await?;
+        metadata_table.read_all_partition_paths_internal().await
+    }
+
+    /// Same as [Table::get_all_partition_paths], but blocking.
+    pub fn get_all_partition_paths_blocking(&self) -> Result<Vec<String>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(self.get_all_partition_paths())
+    }
+
+    /// Read partition paths from __all_partitions__ record.
+    /// Internal method used by metadata tables.
+    async fn read_all_partition_paths_internal(&self) -> Result<Vec<String>> {
+        // Get the files partition file slice
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            return Ok(Vec::new());
+        };
+
+        let excludes = self
+            .timeline
+            .get_replaced_file_groups_as_of(timestamp)
+            .await?;
+        let filters = from_str_tuples([(
+            METADATA_TABLE_PARTITION_FIELD,
+            "=",
+            FilesPartitionRecord::PARTITION_NAME,
+        )])?;
+        let partition_schema = self.get_partition_schema().await?;
+        let partition_pruner =
+            PartitionPruner::new(&filters, &partition_schema, 
self.hudi_configs.as_ref())?;
+        let completion_time_view = self.timeline.create_completion_time_view();
+        let file_slices = self
+            .file_system_view
+            .get_file_slices_as_of(
+                timestamp,
+                &partition_pruner,
+                &excludes,
+                None,
+                &completion_time_view,
+            )
+            .await?;
+
+        if file_slices.len() != 1 {
+            return Err(CoreError::MetadataTable(format!(
+                "Expected 1 file slice for {} partition, got {}",
+                FilesPartitionRecord::PARTITION_NAME,
+                file_slices.len()
+            )));
+        }
+
+        let file_slice = &file_slices[0];
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        // Read only __all_partitions__ record
+        let keys = vec!["__all_partitions__"];
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = file_slice
+            .log_files
+            .iter()
+            .map(|log_file| file_slice.log_file_relative_path(log_file))
+            .collect::<Result<Vec<String>>>()?;
+
+        let records = fg_reader
+            .read_metadata_files_for_keys(&base_file_path, log_file_paths, 
&keys)
+            .await?;
+
+        // Extract partition names from __all_partitions__ record
+        match records.get("__all_partitions__") {
+            Some(record) => 
Ok(record.partition_names().into_iter().map(String::from).collect()),
+            None => Ok(Vec::new()),
+        }
+    }

Review Comment:
   There is significant code duplication between 
`read_all_partition_paths_internal` and 
`read_metadata_files_for_keys_internal`. Both methods share identical logic for 
getting file slices from the metadata table (lines 925-970 and 1035-1078 
respectively). Consider extracting the common logic into a helper method that 
returns the file slice, then call it from both methods to improve 
maintainability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to