Copilot commented on code in PR #256:
URL: https://github.com/apache/fluss-rust/pull/256#discussion_r2771550471
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -409,6 +409,19 @@ impl LogScannerInner {
Ok(())
}
+ async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket:
i32) -> Result<()> {
+ if !self.is_partitioned_table {
+ return Err(Error::UnsupportedOperation {
+ message: "Can't unsubscribe a partition for a non-partitioned
table.".to_string(),
+ });
+ }
+ let table_bucket =
+ TableBucket::new_with_partition(self.table_id, Some(partition_id),
bucket);
+ self.log_scanner_status
+ .unassign_scan_buckets(from_ref(&table_bucket));
+ Ok(())
+ }
Review Comment:
The unsubscribe_partition method doesn't call
check_and_update_table_metadata like subscribe_partition does. All subscribe
methods (subscribe, subscribe_batch, and subscribe_partition) update the table
metadata before modifying the scanner state. For consistency and to ensure the
scanner has the latest table information, unsubscribe_partition should also
call check_and_update_table_metadata before unassigning buckets. This ensures
that partition metadata is current when unsubscribing.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -487,6 +500,14 @@ impl LogScanner {
.subscribe_partition(partition_id, bucket, offset)
.await
}
+
+ pub async fn unsubscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket: i32,
+ ) -> Result<()> {
+ self.inner.unsubscribe_partition(partition_id, bucket).await
+ }
Review Comment:
The API is asymmetric - there are subscribe and subscribe_batch methods for
non-partitioned tables, but no corresponding unsubscribe or unsubscribe_batch
methods. Only unsubscribe_partition is provided for partitioned tables. This
creates an inconsistency where users of non-partitioned tables have no way to
unsubscribe from buckets after subscribing to them. Consider adding unsubscribe
methods for non-partitioned tables to provide a complete and symmetric API.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -409,6 +409,19 @@ impl LogScannerInner {
Ok(())
}
+ async fn unsubscribe_partition(&self, partition_id: PartitionId, bucket:
i32) -> Result<()> {
+ if !self.is_partitioned_table {
+ return Err(Error::UnsupportedOperation {
+ message: "Can't unsubscribe a partition for a non-partitioned
table.".to_string(),
+ });
+ }
+ let table_bucket =
+ TableBucket::new_with_partition(self.table_id, Some(partition_id),
bucket);
+ self.log_scanner_status
+ .unassign_scan_buckets(from_ref(&table_bucket));
+ Ok(())
+ }
Review Comment:
The new unsubscribe_partition method lacks test coverage. There should be
integration tests similar to the existing tests for subscribe_partition (e.g.,
in crates/fluss/tests/integration/log_table.rs) that verify:
1. Successfully unsubscribing from a partition bucket
2. Error handling when trying to unsubscribe from a non-partitioned table
3. Behavior after unsubscribing (e.g., no more records received from that
partition bucket)
This is especially important since the repository has comprehensive
integration test coverage for scanner operations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]