Copilot commented on code in PR #292:
URL: https://github.com/apache/fluss-rust/pull/292#discussion_r2780959005


##########
bindings/python/src/table.rs:
##########
@@ -639,203 +639,100 @@ enum RowInput<'py> {
     List(Bound<'py, pyo3::types::PyList>),
 }
 
-/// Helper function to process sequence types (list/tuple) into datums
-fn process_sequence_to_datums<'a, I>(
-    values: I,
-    len: usize,
-    fields: &[fcore::metadata::DataField],
-) -> PyResult<Vec<fcore::row::Datum<'static>>>
-where
-    I: Iterator<Item = Bound<'a, PyAny>>,
-{
-    if len != fields.len() {
-        return Err(FlussError::new_err(format!(
-            "Expected {} values, got {}",
-            fields.len(),
-            len
-        )));
-    }
-
-    let mut datums = Vec::with_capacity(fields.len());
-    for (i, (field, value)) in fields.iter().zip(values).enumerate() {
-        datums.push(
-            python_value_to_datum(&value, field.data_type()).map_err(|e| {
-                FlussError::new_err(format!("Field '{}' (index {}): {}", 
field.name(), i, e))
-            })?,
-        );
-    }
-    Ok(datums)
-}
-
-/// Convert Python row (dict/list/tuple) to GenericRow based on schema
+/// Convert Python row (dict/list/tuple) to GenericRow requiring all schema 
columns.
 pub fn python_to_generic_row(
     row: &Bound<PyAny>,
     table_info: &fcore::metadata::TableInfo,
 ) -> PyResult<fcore::row::GenericRow<'static>> {
-    // Extract with user-friendly error message
-    let row_input: RowInput = row.extract().map_err(|_| {
-        let type_name = row
-            .get_type()
-            .name()
-            .map(|n| n.to_string())
-            .unwrap_or_else(|_| "unknown".to_string());
-        FlussError::new_err(format!(
-            "Row must be a dict, list, or tuple; got {type_name}"
-        ))
-    })?;
-    let schema = table_info.row_type();
-    let fields = schema.fields();
-
-    let datums = match row_input {
-        RowInput::Dict(dict) => {
-            // Strict: reject unknown keys (and also reject non-str keys 
nicely)
-            for (k, _) in dict.iter() {
-                let key_str = k.extract::<&str>().map_err(|_| {
-                    let key_type = k
-                        .get_type()
-                        .name()
-                        .map(|n| n.to_string())
-                        .unwrap_or_else(|_| "unknown".to_string());
-                    FlussError::new_err(format!("Row dict keys must be 
strings; got {key_type}"))
-                })?;
-
-                if fields.iter().all(|f| f.name() != key_str) {
-                    let expected = fields
-                        .iter()
-                        .map(|f| f.name())
-                        .collect::<Vec<_>>()
-                        .join(", ");
-                    return Err(FlussError::new_err(format!(
-                        "Unknown field '{key_str}'. Expected fields: 
{expected}"
-                    )));
-                }
-            }
-
-            let mut datums = Vec::with_capacity(fields.len());
-            for field in fields {
-                let value = dict.get_item(field.name())?.ok_or_else(|| {
-                    FlussError::new_err(format!("Missing field: {}", 
field.name()))
-                })?;
-                datums.push(
-                    python_value_to_datum(&value, 
field.data_type()).map_err(|e| {
-                        FlussError::new_err(format!("Field '{}': {}", 
field.name(), e))
-                    })?,
-                );
-            }
-            datums
-        }
-
-        RowInput::List(list) => process_sequence_to_datums(list.iter(), 
list.len(), fields)?,
-
-        RowInput::Tuple(tuple) => process_sequence_to_datums(tuple.iter(), 
tuple.len(), fields)?,
-    };
+    let all_indices: Vec<usize> = 
(0..table_info.row_type().fields().len()).collect();
+    python_to_sparse_generic_row(row, table_info, &all_indices)
+}
 
-    Ok(fcore::row::GenericRow { values: datums })
+/// Process a Python sequence (list or tuple) into datums at the target column 
positions.
+fn process_sequence(
+    seq: &Bound<pyo3::types::PySequence>,
+    target_indices: &[usize],
+    fields: &[fcore::metadata::DataField],
+    datums: &mut [fcore::row::Datum<'static>],
+) -> PyResult<()> {
+    if seq.len()? != target_indices.len() {
+        return Err(FlussError::new_err(format!(
+            "Expected {} elements, got {}",
+            target_indices.len(),
+            seq.len()?
+        )));
+    }
+    for (i, &col_idx) in target_indices.iter().enumerate() {
+        let field = &fields[col_idx];
+        let value = seq.get_item(i)?;
+        datums[col_idx] = python_value_to_datum(&value, field.data_type())
+            .map_err(|e| FlussError::new_err(format!("Field '{}': {}", 
field.name(), e)))?;
+    }
+    Ok(())
 }
 
-/// Convert Python primary key values (dict/list/tuple) to GenericRow.
-/// Only requires PK columns; non-PK columns are filled with Null.
-/// For dict: keys should be PK column names.
-/// For list/tuple: values should be PK values in PK column order.
-pub fn python_pk_to_generic_row(
+/// Build a full-width GenericRow filling only the specified column
+/// indices from user input; all other columns are set to Null.
+pub fn python_to_sparse_generic_row(
     row: &Bound<PyAny>,
     table_info: &fcore::metadata::TableInfo,
+    target_indices: &[usize],
 ) -> PyResult<fcore::row::GenericRow<'static>> {

Review Comment:
   python_to_sparse_generic_row introduces new row parsing/mapping behavior 
(sparse column filling, sequence length checks, dict unknown/missing field 
errors). This file already has a #[cfg(test)] module, but there are no tests 
covering these new parsing paths; please add unit tests for dict/list/tuple 
inputs for both full-row (python_to_generic_row) and sparse-row cases to 
prevent regressions.



##########
bindings/python/src/upsert.rs:
##########
@@ -62,7 +64,11 @@ impl UpsertWriter {
     ///          For dict: keys are column names, values are column values.
     ///          For list/tuple: values must be in schema order.
     pub fn upsert(&self, row: &Bound<'_, PyAny>) -> 
PyResult<WriteResultHandle> {
-        let generic_row = python_to_generic_row(row, &self.inner.table_info)?;
+        let generic_row = if let Some(target_cols) = 
&self.inner.target_columns {
+            python_to_sparse_generic_row(row, &self.inner.table_info, 
target_cols)?
+        } else {
+            python_to_generic_row(row, &self.inner.table_info)?

Review Comment:
   UpsertWriter.upsert() now accepts partial updates (target_columns set), in 
which case list/tuple inputs are interpreted in *target column order* (not 
schema order). The docstring still states that list/tuple values must be in 
schema order, which is misleading for partial updates; please update the 
docstring to describe the conditional behavior (full upsert = schema order; 
partial update = provided columns/indices order).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to