Copilot commented on code in PR #228:
URL: https://github.com/apache/fluss-rust/pull/228#discussion_r2746929479


##########
crates/fluss/src/client/table/writer.rs:
##########
@@ -15,21 +15,42 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::row::{GenericRow, InternalRow};
+use crate::client::WriterClient;
+use crate::client::table::partition_getter::PartitionGetter;
+use crate::row::InternalRow;
+use arrow::array::RecordBatch;
+use std::sync::Arc;
 
 use crate::error::Result;
+use crate::metadata::{TableInfo, TablePath};
 
-#[allow(dead_code, async_fn_in_trait)]
+#[allow(async_fn_in_trait)]
 pub trait TableWriter {
     async fn flush(&self) -> Result<()>;
 }
 
-#[allow(dead_code)]
+#[allow(async_fn_in_trait)]
 pub trait AppendWriter: TableWriter {
-    async fn append(&self, row: GenericRow) -> Result<()>;
+    async fn append<R: InternalRow>(&self, row: &R) -> Result<()>;
+
+    async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()>;
+}
+
+#[allow(dead_code)]
+pub struct AppendWriterImpl {
+    table_path: Arc<TablePath>,
+    partition_getter: Option<PartitionGetter>,
+    writer_client: Arc<WriterClient>,
+    table_info: Arc<TableInfo>,
+}
+
+impl TableWriter for AppendWriterImpl {
+    async fn flush(&self) -> Result<()> {
+        self.writer_client.flush().await
+    }
 }

Review Comment:
   This file defines a public `AppendWriterImpl` struct but it doesn’t 
implement the `AppendWriter` trait and duplicates the `AppendWriterImpl` 
defined in `append.rs`. This is confusing and can lead to accidental use of the 
wrong type (missing `append/append_arrow_batch`). Consider removing this struct 
from `writer.rs` (keep only traits), or implement `AppendWriter` here and avoid 
a second implementation type in `append.rs`.



##########
crates/fluss/src/client/table/append.rs:
##########
@@ -70,19 +93,27 @@ impl AppendWriter {
         result_handle.result(result)
     }
 
-    pub async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+    async fn append_arrow_batch(&self, batch: RecordBatch) -> Result<()> {
+        let physical_table_path = if self.partition_getter.is_some() && 
batch.num_rows() > 0 {
+            // For partitioned tables, extract partition from first row
+            let columnar_row = ColumnarRow::new(Arc::new(batch.clone()));
+            Arc::new(get_physical_path(
+                &self.table_path,
+                self.partition_getter.as_ref(),
+                &columnar_row,
+            )?)
+        } else {
+            Arc::new(PhysicalTablePath::of(Arc::clone(&self.table_path)))
+        };

Review Comment:
   `append_arrow_batch` derives the `PhysicalTablePath` from only the first row 
when the table is partitioned. If the `RecordBatch` contains rows from multiple 
partitions, all rows will be written to the first row’s partition with no 
error. Consider either (a) validating that all rows map to the same partition 
and returning an error if not, or (b) splitting the batch by partition and 
sending one write per partition.



##########
crates/fluss/src/row/mod.rs:
##########
@@ -55,7 +55,7 @@ impl<'a> BinaryRow<'a> {
 }
 
 // TODO make functions return Result<?> for better error handling

Review Comment:
   `InternalRow` is a public trait, and adding `Send + Sync` is a breaking 
change for any downstream code implementing it (types containing `Rc`, 
`RefCell`, etc.). If cross-thread usage isn’t required, consider keeping 
`InternalRow` unconstrained and adding `Send + Sync` bounds only where needed 
(e.g., on writer APIs), or document this as an intentional breaking API change.
   ```suggestion
   // TODO make functions return Result<?> for better error handling
   /// Represents a logical row of data.
   ///
   /// This trait is intentionally required to be `Send + Sync` so that 
implementations
   /// can be safely shared and used across threads (e.g., by writers or other
   /// concurrent components). Adding these bounds is a breaking API change for 
any
   /// downstream implementors that rely on non-`Send`/`Sync` types (such as 
`Rc` or
   /// `RefCell`), and this constraint is part of the public contract of 
`InternalRow`.
   ```



##########
crates/fluss/src/client/table/mod.rs:
##########
@@ -34,13 +34,13 @@ mod upsert;
 mod writer;
 
 use crate::client::table::upsert::TableUpsert;
-pub use append::{AppendWriter, TableAppend};
+pub use append::{AppendWriterImpl, TableAppend};
 pub use lookup::{LookupResult, Lookuper, TableLookup};
 pub use remote_log::{
     DEFAULT_SCANNER_REMOTE_LOG_DOWNLOAD_THREADS, 
DEFAULT_SCANNER_REMOTE_LOG_PREFETCH_NUM,
 };
 pub use scanner::{LogScanner, RecordBatchLogScanner, TableScan};
-pub use writer::{TableWriter, UpsertWriter};
+pub use writer::{AppendWriter, TableWriter, UpsertWriter};

Review Comment:
   `AppendWriter` has changed from a concrete writer type to a trait, and the 
concrete type is now exported as `AppendWriterImpl`. This is a breaking public 
API change (e.g., `fluss::client::AppendWriter` now resolves to a trait). If 
backward compatibility is important, consider keeping the previous public type 
name (e.g., re-export the concrete writer as `AppendWriter` and rename the 
trait to `AppendWriterTrait`), or document/bump the public API version 
accordingly.



##########
crates/fluss/tests/integration/utils.rs:
##########
@@ -102,3 +103,26 @@ pub fn get_cluster(cluster_lock: 
&RwLock<Option<FlussTestingCluster>>) -> Arc<Fl
             .clone(),
     )
 }
+
+/// Creates partitions for a partitioned table.
+///
+/// # Arguments
+/// * `admin` - The FlussAdmin instance
+/// * `table_path` - The table path
+/// * `partition_column` - The partition column name
+/// * `partition_values` - The partition values to create
+pub async fn create_partitions(
+    admin: &FlussAdmin,
+    table_path: &TablePath,
+    partition_column: &str,
+    partition_values: &[&str],
+) {
+    for value in partition_values {
+        let mut partition_map = HashMap::new();
+        partition_map.insert(partition_column.to_string(), value.to_string());
+        admin
+            .create_partition(table_path, &PartitionSpec::new(partition_map), 
true)

Review Comment:
   `create_partitions` hard-codes `ignore_if_exists = true`. This changes the 
behavior compared to the previous inline test code (which passed `false`) and 
can hide duplicate-partition bugs in tests. Consider making `ignore_if_exists` 
a parameter or defaulting to `false` to preserve the stricter behavior.
   ```suggestion
               .create_partition(table_path, 
&PartitionSpec::new(partition_map), false)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to