Copilot commented on code in PR #256:
URL: https://github.com/apache/fluss-rust/pull/256#discussion_r2771550471


##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -409,6 +409,19 @@ impl LogScannerInner {
         Ok(())
     }
 
+    async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: 
i32) -> Result<()> {
+        if !self.is_partitioned_table {
+            return Err(Error::UnsupportedOperation {
+                message: "Can't unsubscribe a partition for a non-partitioned 
table.".to_string(),
+            });
+        }
+        let table_bucket =
+            TableBucket::new_with_partition(self.table_id, Some(partition_id), 
bucket);
+        self.log_scanner_status
+            .unassign_scan_buckets(from_ref(&table_bucket));
+        Ok(())
+    }

Review Comment:
   The unsubscribe_partition method doesn't call 
check_and_update_table_metadata like subscribe_partition does. All subscribe 
methods (subscribe, subscribe_batch, and subscribe_partition) update the table 
metadata before modifying the scanner state. For consistency and to ensure the 
scanner has the latest table information, unsubscribe_partition should also 
call check_and_update_table_metadata before unassigning buckets. This ensures 
that partition metadata is current when unsubscribing.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -487,6 +500,14 @@ impl LogScanner {
             .subscribe_partition(partition_id, bucket, offset)
             .await
     }
+
+    pub async fn unsubscribe_partition(
+        &self,
+        partition_id: PartitionId,
+        bucket: i32,
+    ) -> Result<()> {
+        self.inner.unsubscribe_partition(partition_id, bucket).await
+    }

Review Comment:
   The API is asymmetric - there are subscribe and subscribe_batch methods for 
non-partitioned tables, but no corresponding unsubscribe or unsubscribe_batch 
methods. Only unsubscribe_partition is provided for partitioned tables. This 
creates an inconsistency where users of non-partitioned tables have no way to 
unsubscribe from buckets after subscribing to them. Consider adding unsubscribe 
methods for non-partitioned tables to provide a complete and symmetric API.



##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -409,6 +409,19 @@ impl LogScannerInner {
         Ok(())
     }
 
+    async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket: 
i32) -> Result<()> {
+        if !self.is_partitioned_table {
+            return Err(Error::UnsupportedOperation {
+                message: "Can't unsubscribe a partition for a non-partitioned 
table.".to_string(),
+            });
+        }
+        let table_bucket =
+            TableBucket::new_with_partition(self.table_id, Some(partition_id), 
bucket);
+        self.log_scanner_status
+            .unassign_scan_buckets(from_ref(&table_bucket));
+        Ok(())
+    }

Review Comment:
   The new unsubscribe_partition method lacks test coverage. There should be 
integration tests similar to the existing tests for subscribe_partition (e.g., 
in crates/fluss/tests/integration/log_table.rs) that verify:
   1. Successfully unsubscribing from a partition bucket
   2. Error handling when trying to unsubscribe from a non-partitioned table
   3. Behavior after unsubscribing (e.g., no more records received from that 
partition bucket)
   
   This is especially important since the repository has comprehensive 
integration test coverage for scanner operations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to