leekeiabstraction commented on code in PR #222:
URL: https://github.com/apache/fluss-rust/pull/222#discussion_r2747728419
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -54,6 +54,7 @@ pub struct TableScan<'a> {
conn: &'a FlussConnection,
table_info: TableInfo,
metadata: Arc<Metadata>,
+ partition_id: Option<PartitionId>,
Review Comment:
TableScan shouldn't be partition aware.
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -368,6 +380,21 @@ impl LogScannerInner {
Ok(())
}
+ async fn subscribe_partition(
+ &self,
+ partition_id: PartitionId,
+ bucket: i32,
+ offset: i64,
+ ) -> Result<()> {
+ let table_bucket = TableBucket::new(self.table_id, Some(partition_id),
bucket);
Review Comment:
Let's follow Java side logic here, returning error if this is not a
partitioned table.
https://github.com/apache/fluss/blob/71b625ff0c1638539f6089eb727a698f080f92b4/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java#L194-L198
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -337,7 +349,7 @@ impl LogScannerInner {
}
async fn subscribe(&self, bucket: i32, offset: i64) -> Result<()> {
Review Comment:
Let's follow Java side logic here, returning error if this is a partitioned
table.
https://github.com/apache/fluss/blob/71b625ff0c1638539f6089eb727a698f080f92b4/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java#L175-L180
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -618,12 +667,14 @@ impl LogFetcher {
}
async fn check_and_update_metadata(&self) -> Result<()> {
- let need_update = self
+ // Collect buckets that are missing leader information
+ let buckets_needing_leader: Vec<TableBucket> = self
.fetchable_buckets()
- .iter()
- .any(|bucket| self.get_table_bucket_leader(bucket).is_none());
+ .into_iter()
+ .filter(|bucket| self.get_table_bucket_leader(bucket).is_none())
+ .collect();
Review Comment:
+1
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -240,11 +243,17 @@ impl<'a> TableScan<'a> {
self.conn.get_connections(),
self.conn.config(),
self.projected_fields,
+ self.partition_id,
)?;
Ok(RecordBatchLogScanner {
inner: Arc::new(inner),
})
}
+
+ pub fn filter_partition(mut self, partition_id: PartitionId) -> Self {
Review Comment:
Is this API available in Java side?
This doesn't seem necessary as partitioned table scan is called with
partition ID on java side. Calling subscribe without partition ID for
partitioned table results in exception on Java side.
https://github.com/apache/fluss/blob/71b625ff0c1638539f6089eb727a698f080f92b4/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogScannerImpl.java#L192-L198
##########
crates/fluss/src/client/write/sender.rs:
##########
@@ -330,7 +330,7 @@ impl Sender {
let mut pending_buckets: HashSet<TableBucket> =
request_buckets.iter().cloned().collect();
for bucket_resp in response.buckets_resp() {
- let tb = TableBucket::new(table_id, bucket_resp.bucket_id());
+ let tb = TableBucket::new(table_id, None, bucket_resp.bucket_id());
Review Comment:
Should partition id be extracted from response?
See
https://github.com/apache/fluss/blob/71b625ff0c1638539f6089eb727a698f080f92b4/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java#L460-L462
##########
crates/fluss/src/client/table/scanner.rs:
##########
@@ -647,7 +698,7 @@ impl LogFetcher {
return Ok(());
}
- // TODO: Handle PartitionNotExist error
+ // Non-partitioned table: standard metadata refresh
self.metadata
.update_tables_metadata(&HashSet::from([&self.table_path]))
.await
Review Comment:
+1 Did you intend to update metadata only for those with missing leader?
Note, we should follow Java side logic where possible
##########
crates/fluss/src/cluster/cluster.rs:
##########
@@ -273,7 +273,7 @@ impl Cluster {
bucket_id: BucketId,
) -> Result<TableBucket> {
let table_info = self.get_table(table_path)?;
- Ok(TableBucket::new(table_info.table_id, bucket_id))
+ Ok(TableBucket::new(table_info.table_id, None, bucket_id))
Review Comment:
I do not think we can use None here, this function is used to group requests
for the same bucket (partitioned bucket as well). If we override with None
here, the accumulator will look for leader in a non existent bucket for
partitioned table.
##########
crates/fluss/src/client/admin.rs:
##########
@@ -332,7 +332,7 @@ impl FlussAdmin {
let mut node_for_bucket_list: HashMap<i32, Vec<BucketId>> =
HashMap::new();
for bucket_id in buckets {
- let table_bucket = TableBucket::new(table_id, *bucket_id);
+ let table_bucket = TableBucket::new(table_id, None, *bucket_id);
Review Comment:
Partition id is one of the arg of function, pass in that instead of None.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]