Copilot commented on code in PR #6:
URL: https://github.com/apache/fluss-rust/pull/6#discussion_r2354744831


##########
bindings/python/src/utils.rs:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
+use std::sync::Arc;
+use arrow_pyarrow::ToPyArrow;
+use crate::*;
+
+/// Utilities for schema conversion between PyArrow, Arrow, and Fluss
+pub struct Utils;
+
+impl Utils {
+    /// Convert PyArrow schema to Rust Arrow schema
+    pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) -> 
PyResult<SchemaRef> {
+        Python::with_gil(|py| {
+            let schema_bound = py_schema.bind(py);
+            
+            let schema: ArrowSchema = 
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(&schema_bound)
+                .map_err(|e| FlussError::new_err(format!("Failed to convert 
PyArrow schema: {}", e)))?;
+            Ok(Arc::new(schema))
+        })
+    }
+
+    /// Convert Arrow DataType to Fluss DataType
+    pub fn arrow_type_to_fluss_type(arrow_type: &arrow::datatypes::DataType) 
-> PyResult<fcore::metadata::DataType> {
+        use arrow::datatypes::DataType as ArrowDataType;
+        use fcore::metadata::DataTypes;
+
+        let fluss_type = match arrow_type {
+            ArrowDataType::Boolean => DataTypes::boolean(),
+            ArrowDataType::Int8 => DataTypes::tinyint(),
+            ArrowDataType::Int16 => DataTypes::smallint(),
+            ArrowDataType::Int32 => DataTypes::int(),
+            ArrowDataType::Int64 => DataTypes::bigint(),
+            ArrowDataType::UInt8 => DataTypes::tinyint(),
+            ArrowDataType::UInt16 => DataTypes::smallint(),
+            ArrowDataType::UInt32 => DataTypes::int(),
+            ArrowDataType::UInt64 => DataTypes::bigint(),
+            ArrowDataType::Float32 => DataTypes::float(),
+            ArrowDataType::Float64 => DataTypes::double(),
+            ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => 
DataTypes::string(),
+            ArrowDataType::Binary | ArrowDataType::LargeBinary => 
DataTypes::bytes(),
+            ArrowDataType::Date32 => DataTypes::date(),
+            ArrowDataType::Date64 => DataTypes::date(),
+            ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => 
DataTypes::time(),
+            ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(),
+            ArrowDataType::Decimal128(precision, scale) => 
DataTypes::decimal(*precision as u32, *scale as u32),
+            _ => {
+                return Err(FlussError::new_err(format!(
+                    "Unsupported Arrow data type: {:?}", arrow_type
+                )));
+            }
+        };
+
+        Ok(fluss_type)
+    }
+
+    /// Convert Fluss DataType to string representation
+    pub fn datatype_to_string(data_type: &fcore::metadata::DataType) -> String 
{
+        match data_type {
+            fcore::metadata::DataType::Boolean(_) => "boolean".to_string(),
+            fcore::metadata::DataType::TinyInt(_) => "tinyint".to_string(),
+            fcore::metadata::DataType::SmallInt(_) => "smallint".to_string(),
+            fcore::metadata::DataType::Int(_) => "int".to_string(),
+            fcore::metadata::DataType::BigInt(_) => "bigint".to_string(),
+            fcore::metadata::DataType::Float(_) => "float".to_string(),
+            fcore::metadata::DataType::Double(_) => "double".to_string(),
+            fcore::metadata::DataType::String(_) => "string".to_string(),
+            fcore::metadata::DataType::Bytes(_) => "bytes".to_string(),
+            fcore::metadata::DataType::Date(_) => "date".to_string(),
+            fcore::metadata::DataType::Time(t) => {
+                if t.precision() == 0 {
+                    "time".to_string()
+                } else {
+                    format!("time({})", t.precision())
+                }
+            },
+            fcore::metadata::DataType::Timestamp(t) => {
+                if t.precision() == 6 {
+                    "timestamp".to_string()
+                } else {
+                    format!("timestamp({})", t.precision())
+                }
+            },
+            fcore::metadata::DataType::TimestampLTz(t) => {
+                if t.precision() == 6 {
+                    "timestamp_ltz".to_string()
+                } else {
+                    format!("timestamp_ltz({})", t.precision())
+                }
+            },
+            fcore::metadata::DataType::Char(c) => format!("char({})", 
c.length()),
+            fcore::metadata::DataType::Decimal(d) => format!("decimal({},{})", 
d.precision(), d.scale()),
+            fcore::metadata::DataType::Binary(b) => format!("binary({})", 
b.length()),
+            fcore::metadata::DataType::Array(arr) => format!("array<{}>", 
Utils::datatype_to_string(arr.get_element_type())),
+            fcore::metadata::DataType::Map(map) => format!("map<{},{}>", 
+                                        
Utils::datatype_to_string(map.key_type()), 
+                                        
Utils::datatype_to_string(map.value_type())),
+            fcore::metadata::DataType::Row(row) => {
+                let fields: Vec<String> = row.fields().iter()
+                    .map(|field| format!("{}: {}", field.name(), 
Utils::datatype_to_string(field.data_type())))
+                    .collect();
+                format!("row<{}>", fields.join(", "))
+            },
+        }
+    }
+
+    /// Parse log format string to LogFormat enum
+    pub fn parse_log_format(format_str: &str) -> 
PyResult<fcore::metadata::LogFormat> {
+        fcore::metadata::LogFormat::parse(format_str)
+            .map_err(|e| FlussError::new_err(format!("Invalid log format '{}': 
{}", format_str, e)))
+    }
+
+    /// Parse kv format string to KvFormat enum
+    pub fn parse_kv_format(format_str: &str) -> 
PyResult<fcore::metadata::KvFormat> {
+        fcore::metadata::KvFormat::parse(format_str)
+            .map_err(|e| FlussError::new_err(format!("Invalid kv format '{}': 
{}", format_str, e)))
+    }
+
+    /// Convert ScanRecords to Arrow RecordBatch
+    pub fn convert_scan_records_to_arrow(
+        _scan_records: fcore::record::ScanRecords,
+    ) -> Vec<Arc<arrow::record_batch::RecordBatch>> {
+        let mut result = Vec::new();
+        for(_, records) in _scan_records.into_records() {
+            for record in records {
+                let columnar_row = record.row();
+                let row_id = columnar_row.get_row_id();
+                if row_id == 0 {

Review Comment:
   Magic number 0 for row_id comparison lacks explanation. Consider adding a 
comment explaining why only row_id == 0 records are processed or define a 
constant for this value.



##########
bindings/python/src/utils.rs:
##########
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use arrow::datatypes::{Schema as ArrowSchema, SchemaRef};
+use std::sync::Arc;
+use arrow_pyarrow::ToPyArrow;
+use crate::*;
+
+/// Utilities for schema conversion between PyArrow, Arrow, and Fluss
+pub struct Utils;
+
+impl Utils {
+    /// Convert PyArrow schema to Rust Arrow schema
+    pub fn pyarrow_to_arrow_schema(py_schema: &PyObject) -> 
PyResult<SchemaRef> {
+        Python::with_gil(|py| {
+            let schema_bound = py_schema.bind(py);
+            
+            let schema: ArrowSchema = 
arrow_pyarrow::FromPyArrow::from_pyarrow_bound(&schema_bound)
+                .map_err(|e| FlussError::new_err(format!("Failed to convert 
PyArrow schema: {}", e)))?;
+            Ok(Arc::new(schema))
+        })
+    }
+
+    /// Convert Arrow DataType to Fluss DataType
+    pub fn arrow_type_to_fluss_type(arrow_type: &arrow::datatypes::DataType) 
-> PyResult<fcore::metadata::DataType> {
+        use arrow::datatypes::DataType as ArrowDataType;
+        use fcore::metadata::DataTypes;
+
+        let fluss_type = match arrow_type {
+            ArrowDataType::Boolean => DataTypes::boolean(),
+            ArrowDataType::Int8 => DataTypes::tinyint(),
+            ArrowDataType::Int16 => DataTypes::smallint(),
+            ArrowDataType::Int32 => DataTypes::int(),
+            ArrowDataType::Int64 => DataTypes::bigint(),
+            ArrowDataType::UInt8 => DataTypes::tinyint(),
+            ArrowDataType::UInt16 => DataTypes::smallint(),
+            ArrowDataType::UInt32 => DataTypes::int(),
+            ArrowDataType::UInt64 => DataTypes::bigint(),

Review Comment:
   Unsigned integer types are being mapped to signed integer types in Fluss. 
This will cause data corruption for values exceeding the signed range. For 
example, UInt8 values > 127 cannot be represented in tinyint.
   ```suggestion
               ArrowDataType::UInt8 => {
                   // Use Fluss unsigned tinyint if available, else error
                   #[allow(unused)]
                   {
                       #[cfg(feature = "unsigned_types")]
                       { DataTypes::utinyint() }
                       #[cfg(not(feature = "unsigned_types"))]
                       { return Err(FlussError::new_err("Arrow UInt8 type 
cannot be mapped to Fluss signed tinyint without data loss. Please add support 
for unsigned types.")); }
                   }
               },
               ArrowDataType::UInt16 => {
                   #[allow(unused)]
                   {
                       #[cfg(feature = "unsigned_types")]
                       { DataTypes::usmallint() }
                       #[cfg(not(feature = "unsigned_types"))]
                       { return Err(FlussError::new_err("Arrow UInt16 type 
cannot be mapped to Fluss signed smallint without data loss. Please add support 
for unsigned types.")); }
                   }
               },
               ArrowDataType::UInt32 => {
                   #[allow(unused)]
                   {
                       #[cfg(feature = "unsigned_types")]
                       { DataTypes::uint() }
                       #[cfg(not(feature = "unsigned_types"))]
                       { return Err(FlussError::new_err("Arrow UInt32 type 
cannot be mapped to Fluss signed int without data loss. Please add support for 
unsigned types.")); }
                   }
               },
               ArrowDataType::UInt64 => {
                   #[allow(unused)]
                   {
                       #[cfg(feature = "unsigned_types")]
                       { DataTypes::ubigint() }
                       #[cfg(not(feature = "unsigned_types"))]
                       { return Err(FlussError::new_err("Arrow UInt64 type 
cannot be mapped to Fluss signed bigint without data loss. Please add support 
for unsigned types.")); }
                   }
               },
   ```



##########
bindings/python/src/connection.rs:
##########
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use pyo3::prelude::*;
+use crate::*;
+use std::sync::Arc;
+use pyo3_async_runtimes::tokio::future_into_py;
+
+/// Connection to a Fluss cluster
+#[pyclass]
+pub struct FlussConnection {
+    inner: Arc<fcore::client::FlussConnection>,
+}
+
+#[pymethods]
+impl FlussConnection {
+    #[new]
+    fn new(config: &Config) -> PyResult<Self> {
+        // Always use connect to create a new connection
+        Err(FlussError::new_err("Use FlussConnection.connect() instead."))

Review Comment:
   [nitpick] The constructor prevents direct instantiation but the error 
message could be more informative. Consider explaining why direct instantiation 
is not allowed or providing usage examples.
   ```suggestion
           Err(FlussError::new_err(
               "Direct instantiation of FlussConnection is not allowed because 
connections must be established asynchronously. \
   Please use FlussConnection.connect() instead. For example:\n\
       conn = await FlussConnection.connect(config)"
           ))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to