leekeiabstraction commented on code in PR #292:
URL: https://github.com/apache/fluss-rust/pull/292#discussion_r2779668059
##########
bindings/python/src/table.rs:
##########
@@ -639,202 +639,98 @@ enum RowInput<'py> {
List(Bound<'py, pyo3::types::PyList>),
}
-/// Helper function to process sequence types (list/tuple) into datums
-fn process_sequence_to_datums<'a, I>(
- values: I,
- len: usize,
- fields: &[fcore::metadata::DataField],
-) -> PyResult<Vec<fcore::row::Datum<'static>>>
-where
- I: Iterator<Item = Bound<'a, PyAny>>,
-{
- if len != fields.len() {
- return Err(FlussError::new_err(format!(
- "Expected {} values, got {}",
- fields.len(),
- len
- )));
- }
-
- let mut datums = Vec::with_capacity(fields.len());
- for (i, (field, value)) in fields.iter().zip(values).enumerate() {
- datums.push(
- python_value_to_datum(&value, field.data_type()).map_err(|e| {
- FlussError::new_err(format!("Field '{}' (index {}): {}",
field.name(), i, e))
- })?,
- );
- }
- Ok(datums)
-}
-
-/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+/// Convert Python row (dict/list/tuple) to GenericRow requiring all schema
columns.
pub fn python_to_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
) -> PyResult<fcore::row::GenericRow<'static>> {
- // Extract with user-friendly error message
- let row_input: RowInput = row.extract().map_err(|_| {
- let type_name = row
- .get_type()
- .name()
- .map(|n| n.to_string())
- .unwrap_or_else(|_| "unknown".to_string());
- FlussError::new_err(format!(
- "Row must be a dict, list, or tuple; got {type_name}"
- ))
- })?;
- let schema = table_info.row_type();
- let fields = schema.fields();
-
- let datums = match row_input {
- RowInput::Dict(dict) => {
- // Strict: reject unknown keys (and also reject non-str keys
nicely)
- for (k, _) in dict.iter() {
- let key_str = k.extract::<&str>().map_err(|_| {
- let key_type = k
- .get_type()
- .name()
- .map(|n| n.to_string())
- .unwrap_or_else(|_| "unknown".to_string());
- FlussError::new_err(format!("Row dict keys must be
strings; got {key_type}"))
- })?;
-
- if fields.iter().all(|f| f.name() != key_str) {
- let expected = fields
- .iter()
- .map(|f| f.name())
- .collect::<Vec<_>>()
- .join(", ");
- return Err(FlussError::new_err(format!(
- "Unknown field '{key_str}'. Expected fields:
{expected}"
- )));
- }
- }
-
- let mut datums = Vec::with_capacity(fields.len());
- for field in fields {
- let value = dict.get_item(field.name())?.ok_or_else(|| {
- FlussError::new_err(format!("Missing field: {}",
field.name()))
- })?;
- datums.push(
- python_value_to_datum(&value,
field.data_type()).map_err(|e| {
- FlussError::new_err(format!("Field '{}': {}",
field.name(), e))
- })?,
- );
- }
- datums
- }
-
- RowInput::List(list) => process_sequence_to_datums(list.iter(),
list.len(), fields)?,
-
- RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(),
tuple.len(), fields)?,
- };
-
- Ok(fcore::row::GenericRow { values: datums })
+ let all_indices: Vec<usize> =
(0..table_info.row_type().fields().len()).collect();
+ python_to_sparse_generic_row(row, table_info, &all_indices)
}
-/// Convert Python primary key values (dict/list/tuple) to GenericRow.
-/// Only requires PK columns; non-PK columns are filled with Null.
-/// For dict: keys should be PK column names.
-/// For list/tuple: values should be PK values in PK column order.
-pub fn python_pk_to_generic_row(
+/// Build a full-width GenericRow filling only the specified column
+/// indices from user input; all other columns are set to Null.
+pub fn python_to_sparse_generic_row(
row: &Bound<PyAny>,
table_info: &fcore::metadata::TableInfo,
+ target_indices: &[usize],
) -> PyResult<fcore::row::GenericRow<'static>> {
- let schema = table_info.get_schema();
let row_type = table_info.row_type();
let fields = row_type.fields();
- let pk_indexes = schema.primary_key_indexes();
- let pk_names: Vec<&str> = schema.primary_key_column_names();
-
- if pk_indexes.is_empty() {
- return Err(FlussError::new_err(
- "Table has no primary key; cannot use PK-only row",
- ));
- }
+ let target_names: Vec<&str> = target_indices.iter().map(|&i|
fields[i].name()).collect();
- // Initialize all datums as Null
let mut datums: Vec<fcore::row::Datum<'static>> =
vec![fcore::row::Datum::Null; fields.len()];
- // Extract with user-friendly error message
let row_input: RowInput = row.extract().map_err(|_| {
let type_name = row
.get_type()
.name()
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
FlussError::new_err(format!(
- "PK row must be a dict, list, or tuple; got {type_name}"
+ "Row must be a dict, list, or tuple; got {type_name}"
))
})?;
match row_input {
RowInput::Dict(dict) => {
- // Validate keys are PK columns
for (k, _) in dict.iter() {
let key_str = k.extract::<&str>().map_err(|_| {
let key_type = k
.get_type()
.name()
.map(|n| n.to_string())
.unwrap_or_else(|_| "unknown".to_string());
- FlussError::new_err(format!("PK dict keys must be strings;
got {key_type}"))
+ FlussError::new_err(format!("Dict keys must be strings;
got {key_type}"))
})?;
-
- if !pk_names.contains(&key_str) {
+ if !target_names.contains(&key_str) {
return Err(FlussError::new_err(format!(
- "Unknown PK field '{}'. Expected PK fields: {}",
+ "Unknown field '{}'. Expected: {}",
key_str,
- pk_names.join(", ")
+ target_names.join(", ")
)));
}
}
-
- // Extract PK values
- for (i, pk_idx) in pk_indexes.iter().enumerate() {
- let pk_name = pk_names[i];
- let field: &fcore::metadata::DataField = &fields[*pk_idx];
+ for (i, &col_idx) in target_indices.iter().enumerate() {
+ let name = target_names[i];
+ let field = &fields[col_idx];
let value = dict
- .get_item(pk_name)?
- .ok_or_else(|| FlussError::new_err(format!("Missing PK
field: {pk_name}")))?;
- datums[*pk_idx] = python_value_to_datum(&value,
field.data_type())
- .map_err(|e| FlussError::new_err(format!("PK field
'{pk_name}': {e}")))?;
+ .get_item(name)?
+ .ok_or_else(|| FlussError::new_err(format!("Missing field:
{name}")))?;
+ datums[col_idx] = python_value_to_datum(&value,
field.data_type())
+ .map_err(|e| FlussError::new_err(format!("Field '{name}':
{e}")))?;
}
}
RowInput::List(list) => {
- if list.len() != pk_indexes.len() {
+ if list.len() != target_indices.len() {
return Err(FlussError::new_err(format!(
- "PK list must have {} elements (PK columns), got {}",
- pk_indexes.len(),
+ "Expected {} elements, got {}",
+ target_indices.len(),
list.len()
)));
}
- for (i, pk_idx) in pk_indexes.iter().enumerate() {
- let field: &fcore::metadata::DataField = &fields[*pk_idx];
+ for (i, &col_idx) in target_indices.iter().enumerate() {
+ let field = &fields[col_idx];
let value = list.get_item(i)?;
- datums[*pk_idx] =
- python_value_to_datum(&value,
field.data_type()).map_err(|e| {
- FlussError::new_err(format!("PK field '{}': {}",
field.name(), e))
- })?;
+ datums[col_idx] = python_value_to_datum(&value,
field.data_type())
+ .map_err(|e| FlussError::new_err(format!("Field '{}': {}",
field.name(), e)))?;
}
}
RowInput::Tuple(tuple) => {
- if tuple.len() != pk_indexes.len() {
+ if tuple.len() != target_indices.len() {
return Err(FlussError::new_err(format!(
- "PK tuple must have {} elements (PK columns), got {}",
- pk_indexes.len(),
+ "Expected {} elements, got {}",
+ target_indices.len(),
tuple.len()
)));
}
- for (i, pk_idx) in pk_indexes.iter().enumerate() {
- let field: &fcore::metadata::DataField = &fields[*pk_idx];
+ for (i, &col_idx) in target_indices.iter().enumerate() {
+ let field = &fields[col_idx];
let value = tuple.get_item(i)?;
- datums[*pk_idx] =
- python_value_to_datum(&value,
field.data_type()).map_err(|e| {
- FlussError::new_err(format!("PK field '{}': {}",
field.name(), e))
- })?;
+ datums[col_idx] = python_value_to_datum(&value,
field.data_type())
+ .map_err(|e| FlussError::new_err(format!("Field '{}': {}",
field.name(), e)))?;
Review Comment:
nit: Looks fairly similar to RowInput::List arm, refactor for less code
duplication?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]