Copilot commented on code in PR #229:
URL: https://github.com/apache/fluss-rust/pull/229#discussion_r2748688345


##########
crates/fluss/src/client/admin.rs:
##########
@@ -294,23 +294,69 @@ impl FlussAdmin {
         buckets_id: &[BucketId],
         offset_spec: OffsetSpec,
     ) -> Result<HashMap<i32, i64>> {
-        self.metadata
-            .check_and_update_table_metadata(from_ref(table_path))
-            .await?;
+        self.do_list_offsets(table_path, None, buckets_id, offset_spec)
+            .await
+    }
+
+    /// List offset for the specified buckets in a partition. This operation 
enables to find
+    /// the beginning offset, end offset as well as the offset matching a 
timestamp in buckets.
+    pub async fn list_partition_offsets(
+        &self,
+        table_path: &TablePath,
+        partition_name: &str,
+        buckets_id: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, i64>> {
+        self.do_list_offsets(table_path, Some(partition_name), buckets_id, 
offset_spec)
+            .await
+    }
 
+    async fn do_list_offsets(
+        &self,
+        table_path: &TablePath,
+        partition_name: Option<&str>,
+        buckets_id: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, i64>> {
         if buckets_id.is_empty() {
-            return Err(Error::UnexpectedError {
+            return Err(Error::IllegalArgument {
                 message: "Buckets are empty.".to_string(),
-                source: None,
             });
         }
 
+        // force to update table metadata like java side
+        self.metadata.update_table_metadata(table_path).await?;
+
         let cluster = self.metadata.get_cluster();
         let table_id = cluster.get_table(table_path)?.table_id;
 
+        // Resolve partition_id from partition_name if provided
+        let partition_id = if let Some(name) = partition_name {
+            let physical_table_path = 
Arc::new(PhysicalTablePath::of_partitioned(
+                Arc::new(table_path.clone()),
+                Some(name.to_string()),
+            ));
+
+            // Update partition metadata like java side
+            self.metadata
+                .update_physical_table_metadata(from_ref(&physical_table_path))

Review Comment:
   `std::array::from_ref` returns an array of references (`[&T; 1]`), but 
`update_physical_table_metadata` expects a slice of `Arc<PhysicalTablePath>` 
(`&[Arc<PhysicalTablePath>]`), so `from_ref(&physical_table_path)` here does 
not have the correct type and this code will not compile. You likely want to 
use `std::slice::from_ref` so that the call produces a 
`&[Arc<PhysicalTablePath>]` matching the method signature, or otherwise adjust 
the helper signature to accept the array type you are constructing.
   ```suggestion
                   
.update_physical_table_metadata(std::slice::from_ref(&physical_table_path))
   ```



##########
crates/fluss/tests/integration/log_table.rs:
##########
@@ -1072,6 +1072,29 @@ mod table_test {
             .await
             .expect("Failed to flush batches");
 
+        // Test list_offsets_for_partition

Review Comment:
   The comment `// Test list_offsets_for_partition` refers to a function name 
that doesnt exist; the actual API introduced here is `list_partition_offsets`. 
To avoid confusion for future readers, please update the comment to use the 
correct method name.
   ```suggestion
           // Test list_partition_offsets
   ```



##########
crates/fluss/src/client/admin.rs:
##########
@@ -294,23 +294,69 @@ impl FlussAdmin {
         buckets_id: &[BucketId],
         offset_spec: OffsetSpec,
     ) -> Result<HashMap<i32, i64>> {
-        self.metadata
-            .check_and_update_table_metadata(from_ref(table_path))
-            .await?;
+        self.do_list_offsets(table_path, None, buckets_id, offset_spec)
+            .await
+    }
+
+    /// List offset for the specified buckets in a partition. This operation 
enables to find
+    /// the beginning offset, end offset as well as the offset matching a 
timestamp in buckets.
+    pub async fn list_partition_offsets(
+        &self,
+        table_path: &TablePath,
+        partition_name: &str,
+        buckets_id: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, i64>> {
+        self.do_list_offsets(table_path, Some(partition_name), buckets_id, 
offset_spec)
+            .await
+    }
 
+    async fn do_list_offsets(
+        &self,
+        table_path: &TablePath,
+        partition_name: Option<&str>,
+        buckets_id: &[BucketId],
+        offset_spec: OffsetSpec,
+    ) -> Result<HashMap<i32, i64>> {
         if buckets_id.is_empty() {
-            return Err(Error::UnexpectedError {
+            return Err(Error::IllegalArgument {
                 message: "Buckets are empty.".to_string(),
-                source: None,
             });
         }
 
+        // force to update table metadata like java side
+        self.metadata.update_table_metadata(table_path).await?;
+
         let cluster = self.metadata.get_cluster();
         let table_id = cluster.get_table(table_path)?.table_id;
 
+        // Resolve partition_id from partition_name if provided
+        let partition_id = if let Some(name) = partition_name {
+            let physical_table_path = 
Arc::new(PhysicalTablePath::of_partitioned(
+                Arc::new(table_path.clone()),
+                Some(name.to_string()),
+            ));
+
+            // Update partition metadata like java side
+            self.metadata
+                .update_physical_table_metadata(from_ref(&physical_table_path))
+                .await?;
+

Review Comment:
   In `do_list_offsets`, the sequence `update_table_metadata(table_path)` 
followed by `update_physical_table_metadata(from_ref(&physical_table_path))` 
results in two identical metadata update RPCs for the partitioned case: 
`update_physical_table_metadata` currently only adds the table path (not the 
partition path) to `update_table_paths`, so both calls end up invoking 
`update_tables_metadata` with the same `{table_path}` set. This redundant 
round-trip adds unnecessary latency to `list_partition_offsets`; consider 
either removing the second call or updating `update_physical_table_metadata` so 
that it actually uses the `PhysicalTablePath` to request partition-specific 
metadata instead of reusing only the logical table path.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to