This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 441decb fix: avoid dashmap deadlock for correlated queries (#539)
441decb is described below
commit 441decb3ae36b73b6cdcd276de4cce21f0fa8bdf
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Mar 9 10:30:38 2026 -0500
fix: avoid dashmap deadlock for correlated queries (#539)
---
crates/core/src/schema/delete.rs | 31 ++++++++++++-------------------
crates/core/src/table/fs_view.rs | 18 +++++++++++++-----
2 files changed, 25 insertions(+), 24 deletions(-)
diff --git a/crates/core/src/schema/delete.rs b/crates/core/src/schema/delete.rs
index 7f5efd1..351e9dc 100644
--- a/crates/core/src/schema/delete.rs
+++ b/crates/core/src/schema/delete.rs
@@ -25,20 +25,15 @@ use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use once_cell::sync::Lazy;
use serde_json::Value as JsonValue;
-use std::fs;
-use std::fs::File;
-use std::path::PathBuf;
use std::sync::Arc;
-static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy<Result<JsonValue>> =
Lazy::new(|| {
- let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
- .join("schemas")
- .join("HoodieDeleteRecord.avsc");
-
- let content = fs::read_to_string(schema_path)
- .map_err(|e| CoreError::Schema(format!("Failed to read schema file:
{e}")))?;
+static DELETE_RECORD_AVRO_SCHEMA_STR: &str = include_str!(concat!(
+ env!("CARGO_MANIFEST_DIR"),
+ "/schemas/HoodieDeleteRecord.avsc"
+));
- serde_json::from_str(&content)
+static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy<Result<JsonValue>> =
Lazy::new(|| {
+ serde_json::from_str(DELETE_RECORD_AVRO_SCHEMA_STR)
.map_err(|e| CoreError::Schema(format!("Failed to parse schema to
JSON: {e}")))
});
@@ -110,15 +105,13 @@ pub fn avro_schema_for_delete_record(delete_record_value:
&AvroValue) -> Result<
AvroSchema::parse(&json).map_err(CoreError::AvroError)
}
-static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy<Result<AvroSchema>> = Lazy::new(||
{
- let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
- .join("schemas")
- .join("HoodieDeleteRecordList.avsc");
-
- let mut file = File::open(&schema_path)
- .map_err(|e| CoreError::Schema(format!("Failed to open schema file:
{e}")))?;
+static DELETE_RECORD_LIST_AVRO_SCHEMA_STR: &str = include_str!(concat!(
+ env!("CARGO_MANIFEST_DIR"),
+ "/schemas/HoodieDeleteRecordList.avsc"
+));
- AvroSchema::parse_reader(&mut file).map_err(CoreError::AvroError)
+static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy<Result<AvroSchema>> = Lazy::new(||
{
+
AvroSchema::parse_str(DELETE_RECORD_LIST_AVRO_SCHEMA_STR).map_err(CoreError::AvroError)
});
pub fn avro_schema_for_delete_record_list() -> Result<&'static AvroSchema> {
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 223a86f..a3740c1 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -210,6 +210,10 @@ impl FileSystemView {
}
/// Collect file slices from loaded file groups using the timeline view.
+ ///
+ /// File slices are first collected from the DashMap using read locks
(released
+ /// promptly), then metadata is loaded on the owned clones without holding
any
+ /// locks.
async fn collect_file_slices(
&self,
partition_pruner: &PartitionPruner,
@@ -219,21 +223,25 @@ impl FileSystemView {
let excluding_file_groups = timeline_view.excluding_file_groups();
let mut file_slices = Vec::new();
- for mut partition_entry in self.partition_to_file_groups.iter_mut() {
+ for partition_entry in self.partition_to_file_groups.iter() {
if !partition_pruner.should_include(partition_entry.key()) {
continue;
}
- let file_groups = partition_entry.value_mut();
- for fg in file_groups.iter_mut() {
+ let file_groups = partition_entry.value();
+ for fg in file_groups.iter() {
if excluding_file_groups.contains(fg) {
continue;
}
- if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
- fsl.load_metadata_if_needed(&self.storage).await?;
+ if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
file_slices.push(fsl.clone());
}
}
}
+
+ for fsl in &mut file_slices {
+ fsl.load_metadata_if_needed(&self.storage).await?;
+ }
+
Ok(file_slices)
}