luoyuxia commented on code in PR #273:
URL: https://github.com/apache/fluss-rust/pull/273#discussion_r2778425910
##########
bindings/cpp/include/fluss.hpp:
##########
@@ -713,6 +723,7 @@ class LogScanner {
Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result SubscribePartition(int64_t partition_id, int32_t bucket_id, int64_t
start_offset);
+ Result SubscribePartitionBuckets(const
std::vector<PartitionBucketSubscription>& subscriptions);
Review Comment:
nit:
change to `SubscribePartitionBuckets` since we also use `Subscribe` for
batch subscribe or single one subscribe
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -409,6 +409,39 @@ impl LogScannerInner {
Ok(())
}
+ async fn subscribe_partition_buckets(
Review Comment:
nit: maybe we can extract the common logic?
```
async fn subscribe_buckets(&self, bucket_offsets: &HashMap<i32, i64>) ->
Result<()> {
if self.is_partitioned_table {
return Err(Error::UnsupportedOperation {
message:
"The table is a partitioned table, please use
\"subscribe_partition_buckets\" instead."
.to_string(),
});
}
let mut scan_bucket_offsets = HashMap::new();
for (bucket_id, offset) in bucket_offsets {
let table_bucket = TableBucket::new(self.table_id, *bucket_id);
scan_bucket_offsets.insert(table_bucket, *offset);
}
self.do_subscribe_buckets(
scan_bucket_offsets,
).await
}
async fn subscribe_partition_buckets(
&self,
partition_bucket_offsets: &HashMap<(PartitionId, i32), i64>,
) -> Result<()> {
if !self.is_partitioned_table {
return Err(UnsupportedOperation {
message: "The table is not a partitioned table, please use
\"subscribe_buckets\" \
to subscribe to non-partitioned buckets instead."
.to_string(),
});
}
let mut scan_bucket_offsets = HashMap::new();
for (&(partition_id, bucket_id), &offset) in
partition_bucket_offsets {
let table_bucket =
TableBucket::new_with_partition(self.table_id,
Some(partition_id), bucket_id);
scan_bucket_offsets.insert(table_bucket, offset);
}
self.do_subscribe_buckets(
scan_bucket_offsets).await
}
async fn do_subscribe_buckets(&self, bucket_offsets: HashMap<TableBucket,
i64>) -> Result<()> {
if bucket_offsets.is_empty() {
return Err(Error::UnexpectedError {
message: "Bucket offsets are empty.".to_string(),
source: None,
});
}
self.metadata
.check_and_update_table_metadata(from_ref(&self.table_path))
.await?;
self.log_scanner_status.assign_scan_buckets(bucket_offsets);
Ok(())
}
```
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]