This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 199af0bf7 [client] Support Scan#createBatchScanner() API to read data
from the whole table (#2794)
199af0bf7 is described below
commit 199af0bf78c3b42347cf9d4c1d78967bfe3dd42d
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Mar 11 00:04:39 2026 +0800
[client] Support Scan#createBatchScanner() API to read data from the whole
table (#2794)
---
.../apache/fluss/client/table/scanner/Scan.java | 4 +
.../fluss/client/table/scanner/TableScan.java | 43 ++++-
.../client/table/scanner/batch/BatchScanUtils.java | 2 +-
.../table/scanner/batch/CompositeBatchScanner.java | 95 +++++++++++
...hScannerITCase.java => BatchScannerITCase.java} | 22 ++-
.../scanner/batch/CompositeBatchScannerTest.java | 189 +++++++++++++++++++++
.../apache/fluss/flink/utils/PushdownUtils.java | 46 +----
7 files changed, 358 insertions(+), 43 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
index 1c05bd391..2b7a5e732 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableBucket;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.List;
/**
@@ -88,4 +89,7 @@ public interface Scan {
* #limit(int)} and only support for Primary Key Tables.
*/
BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId);
+
+ /** Creates a {@link BatchScanner} to read current data in the given table
for this scan. */
+ BatchScanner createBatchScanner() throws IOException;
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
index 0d1d28a0e..640f2541a 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
@@ -21,6 +21,7 @@ import org.apache.fluss.client.FlussConnection;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
+import org.apache.fluss.client.table.scanner.batch.CompositeBatchScanner;
import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
import org.apache.fluss.client.table.scanner.log.LogScanner;
@@ -29,6 +30,7 @@ import
org.apache.fluss.client.table.scanner.log.TypedLogScanner;
import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.SchemaGetter;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
@@ -36,11 +38,13 @@ import org.apache.fluss.types.RowType;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/** API for configuring and creating {@link LogScanner} and {@link
BatchScanner}. */
public class TableScan implements Scan {
-
private final FlussConnection conn;
private final TableInfo tableInfo;
private final SchemaGetter schemaGetter;
@@ -171,4 +175,41 @@ public class TableScan implements Scan {
tableInfo.getTableConfig().getKvFormat(),
conn.getOrCreateRemoteFileDownloader());
}
+
+ @Override
+ public BatchScanner createBatchScanner() throws IOException {
+ int bucketCount = tableInfo.getNumBuckets();
+ List<TableBucket> tableBuckets;
+ if (tableInfo.isPartitioned()) {
+ List<PartitionInfo> partitionInfos;
+ try {
+ partitionInfos =
conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get();
+ } catch (Exception e) {
+ throw new IOException(
+ "Failed to list partition infos for table" +
tableInfo.getTablePath(), e);
+ }
+ tableBuckets =
+ partitionInfos.stream()
+ .flatMap(
+ partitionInfo ->
+ IntStream.range(0, bucketCount)
+ .mapToObj(
+ bucketId ->
+ new
TableBucket(
+
tableInfo.getTableId(),
+
partitionInfo
+
.getPartitionId(),
+
bucketId)))
+ .collect(Collectors.toList());
+ } else {
+ tableBuckets =
+ IntStream.range(0, bucketCount)
+ .mapToObj(bucketId -> new
TableBucket(tableInfo.getTableId(), bucketId))
+ .collect(Collectors.toList());
+ }
+
+ List<BatchScanner> scanners =
+
tableBuckets.stream().map(this::createBatchScanner).collect(Collectors.toList());
+ return new CompositeBatchScanner(scanners, limit);
+ }
}
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
index d36bf9a7d..aaf1b9711 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
@@ -89,7 +89,7 @@ public class BatchScanUtils {
throw new RuntimeException("Failed to close scanner", e);
}
}
- return rows;
+ return rows.size() > limit ? rows.subList(0, limit) : rows;
}
/**
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java
new file mode 100644
index 000000000..2026e5438
--- /dev/null
+++
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static
org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectLimitedRows;
+
+/**
+ * A {@link BatchScanner} that combines multiple {@link BatchScanner}
instances into a single
+ * scanner. It polls the underlying scanners in a round-robin fashion: each
{@link
+ * #pollBatch(Duration)} call is delegated to the next scanner in the queue,
and scanners that still
+ * have data are re-enqueued while exhausted scanners are closed and removed.
+ *
+ * <p>When a {@code limit} is specified, rows are collected eagerly across all
underlying scanners
+ * up to that limit and returned in a single batch.
+ */
+@Internal
+public class CompositeBatchScanner implements BatchScanner {
+
+ /** Queue of underlying scanners to be polled in order. */
+ private final LinkedList<BatchScanner> scannerQueue;
+
+ /** Optional row limit; when set, rows are collected eagerly up to this
count. */
+ private final @Nullable Integer limit;
+
+ public CompositeBatchScanner(List<BatchScanner> scanners, @Nullable
Integer limit) {
+ this.scannerQueue = new LinkedList<>(scanners);
+ this.limit = limit;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Ensure all scanners are closed on failure to avoid resource leaks
+ scannerQueue.forEach(IOUtils::closeQuietly);
+ scannerQueue.clear();
+ }
+
+ @Nullable
+ @Override
+ public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws
IOException {
+
+ while (!scannerQueue.isEmpty()) {
+ // Direct return limit scan which don't have so much data.
+ if (limit != null) {
+ CloseableIterator<InternalRow> iterator =
+
CloseableIterator.wrap(collectLimitedRows(scannerQueue, limit).iterator());
+ scannerQueue.clear();
+ return iterator;
+ }
+
+ BatchScanner scanner = scannerQueue.poll();
+ try {
+ CloseableIterator<InternalRow> iterator =
scanner.pollBatch(timeout);
+ if (iterator != null) {
+ // If the scanner has more data, add it back to the queue
+ scannerQueue.add(scanner);
+ return iterator;
+ } else {
+ // Close the scanner if it has no more data, and not add
it back to the queue
+ scanner.close();
+ }
+ } catch (Exception e) {
+ scannerQueue.add(scanner);
+ throw new IOException("Failed to collect rows", e);
+ }
+ }
+ return null;
+ }
+}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java
similarity index 94%
rename from
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
rename to
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java
index 24415f075..67e1616eb 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java
@@ -53,12 +53,13 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
import static org.assertj.core.api.Assertions.assertThat;
-/** IT Case for {@link KvSnapshotBatchScanner}. */
-class KvSnapshotBatchScannerITCase extends ClientToServerITCaseBase {
+/** IT Case for {@link BatchScanner}. */
+class BatchScannerITCase extends ClientToServerITCaseBase {
private static final int DEFAULT_BUCKET_NUM = 3;
@@ -324,6 +325,23 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
table.close();
}
+ @Test
+ void testTableLevelScanRespectsLimit() throws Exception {
+ TablePath tablePath = TablePath.of(DEFAULT_DB,
"test-table-level-scan-limit");
+ long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true);
+
+ // insert 3 rows per bucket (9 total across 3 buckets)
+ putRows(tableId, tablePath, 9);
+
+ int limit = 5;
+ try (Table table = conn.getTable(tablePath);
+ BatchScanner scanner =
table.newScan().limit(limit).createBatchScanner()) {
+ List<InternalRow> actual = collectRows(scanner);
+ // collectLimitedRows stops once >= limit rows are collected
+ assertThat(actual.size()).isEqualTo(limit);
+ }
+ }
+
// -------- Utils method
private static int getBucketId(InternalRow row) {
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java
new file mode 100644
index 000000000..129fc028b
--- /dev/null
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompositeBatchScanner}. */
+class CompositeBatchScannerTest {
+
+ private static final Duration TIMEOUT = Duration.ofMillis(10);
+
+ //
-------------------------------------------------------------------------
+ // No-limit tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPollBatchWithNoLimit() throws IOException {
+ // Three scanners each holding rows [0], [1], [2].
+ // CompositeBatchScanner should round-robin and eventually return all
rows.
+ StubBatchScanner s1 = scanner(0);
+ StubBatchScanner s2 = scanner(1);
+ StubBatchScanner s3 = scanner(2);
+
+ CompositeBatchScanner composite =
+ new CompositeBatchScanner(Arrays.asList(s1, s2, s3), null);
+
+ List<InternalRow> collected = collectAll(composite);
+
+ assertThat(collected).hasSize(3);
+ assertThat(intValues(collected)).containsExactlyInAnyOrder(0, 1, 2);
+ assertThat(s1.closed).isTrue();
+ assertThat(s2.closed).isTrue();
+ assertThat(s3.closed).isTrue();
+ }
+
+ @Test
+ void testPollBatchSkipsExhaustedScanner() throws IOException {
+ // s1 is already exhausted (returns null immediately), s2 has data.
+ StubBatchScanner s1 = scanner(); // no rows → immediately returns null
+ StubBatchScanner s2 = scanner(99);
+
+ CompositeBatchScanner composite = new
CompositeBatchScanner(Arrays.asList(s1, s2), null);
+
+ List<InternalRow> collected = collectAll(composite);
+
+ assertThat(intValues(collected)).containsExactly(99);
+ assertThat(s1.closed).isTrue();
+ assertThat(s2.closed).isTrue();
+ }
+
+ @Test
+ void testPollBatchWithLimit() throws IOException {
+ // Two scanners with 3 rows each (one row per batch), limit = 3.
+ // collectLimitedRows collects until rows.size() >= limit.
+ StubBatchScanner s1 = scanner(1, 2, 3);
+ StubBatchScanner s2 = scanner(4, 5, 6);
+
+ CompositeBatchScanner composite = new
CompositeBatchScanner(Arrays.asList(s1, s2), 3);
+
+ CloseableIterator<InternalRow> batch = composite.pollBatch(TIMEOUT);
+ assertThat(batch).isNotNull();
+
+ List<Integer> values = new ArrayList<>();
+ while (batch.hasNext()) {
+ values.add(batch.next().getInt(0));
+ }
+ assertThat(values.size()).isEqualTo(3);
+ }
+
+ @Test
+ void testCloseAllRemainingScannersInQueue() throws IOException {
+ StubBatchScanner s1 = scanner(1, 2);
+ StubBatchScanner s2 = scanner(3, 4);
+
+ CompositeBatchScanner composite = new
CompositeBatchScanner(Arrays.asList(s1, s2), null);
+
+ // Poll one batch so s1 is re-enqueued and s2 stays in queue.
+ composite.pollBatch(TIMEOUT);
+ composite.close();
+
+ assertThat(s1.closed).isTrue();
+ assertThat(s2.closed).isTrue();
+ }
+
+ @Test
+ void testCloseOnEmptyQueueDoesNotThrow() throws IOException {
+ CompositeBatchScanner composite = new
CompositeBatchScanner(Collections.emptyList(), null);
+ composite.close(); // should not throw
+ }
+
+ //
-------------------------------------------------------------------------
+ // Helpers
+ //
-------------------------------------------------------------------------
+ /** Creates a {@link StubBatchScanner} that returns one row per {@code
pollBatch} call. */
+ private static StubBatchScanner scanner(int... values) {
+ return new StubBatchScanner(values);
+ }
+
+ /** Drains the composite scanner and collects all rows. */
+ private static List<InternalRow> collectAll(CompositeBatchScanner
composite)
+ throws IOException {
+ List<InternalRow> rows = new ArrayList<>();
+ CloseableIterator<InternalRow> batch;
+ while ((batch = composite.pollBatch(TIMEOUT)) != null) {
+ while (batch.hasNext()) {
+ rows.add(batch.next());
+ }
+ batch.close();
+ }
+ return rows;
+ }
+
+ private static List<Integer> intValues(List<InternalRow> rows) {
+ List<Integer> values = new ArrayList<>();
+ for (InternalRow row : rows) {
+ values.add(row.getInt(0));
+ }
+ return values;
+ }
+
+ //
-------------------------------------------------------------------------
+ // Stub
+ //
-------------------------------------------------------------------------
+
+ /**
+ * A stub {@link BatchScanner} that returns one row per {@link #pollBatch}
call from a
+ * pre-defined value list, then returns {@code null} once exhausted.
Tracks whether it was
+ * closed.
+ */
+ private static class StubBatchScanner implements BatchScanner {
+
+ private final Queue<Integer> values;
+ boolean closed = false;
+
+ StubBatchScanner(int[] values) {
+ this.values = new LinkedList<>();
+ for (int v : values) {
+ this.values.add(v);
+ }
+ }
+
+ @Nullable
+ @Override
+ public CloseableIterator<InternalRow> pollBatch(Duration timeout) {
+ if (values.isEmpty()) {
+ return null;
+ }
+ GenericRow row = GenericRow.of(values.poll());
+ return
CloseableIterator.wrap(Collections.<InternalRow>singletonList(row).iterator());
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
index 455d1f365..1afae9516 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
@@ -23,8 +23,6 @@ import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.admin.ListOffsetsResult;
import org.apache.fluss.client.admin.OffsetSpec;
import org.apache.fluss.client.table.Table;
-import org.apache.fluss.client.table.scanner.Scan;
-import org.apache.fluss.client.table.scanner.batch.BatchScanUtils;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.writer.UpsertWriter;
import org.apache.fluss.config.Configuration;
@@ -33,7 +31,6 @@ import org.apache.fluss.exception.UnsupportedVersionException;
import org.apache.fluss.flink.source.lookup.FlinkLookupFunction;
import org.apache.fluss.flink.source.lookup.LookupNormalizer;
import org.apache.fluss.metadata.PartitionInfo;
-import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metadata.TableStats;
@@ -75,6 +72,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
import static
org.apache.fluss.flink.source.lookup.LookupNormalizer.createPrimaryKeyLookupNormalizer;
import static org.apache.fluss.utils.ExceptionUtils.findThrowable;
@@ -308,42 +306,12 @@ public class PushdownUtils {
int limit = (int) limitRowNum;
try (Connection connection =
ConnectionFactory.createConnection(flussConfig);
Table table = connection.getTable(tablePath);
- Admin flussAdmin = connection.getAdmin()) {
- TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
- int bucketCount = tableInfo.getNumBuckets();
- List<TableBucket> tableBuckets;
- if (tableInfo.isPartitioned()) {
- List<PartitionInfo> partitionInfos =
flussAdmin.listPartitionInfos(tablePath).get();
- tableBuckets =
- partitionInfos.stream()
- .flatMap(
- partitionInfo ->
- IntStream.range(0, bucketCount)
- .mapToObj(
- bucketId ->
- new
TableBucket(
-
tableInfo
-
.getTableId(),
-
partitionInfo
-
.getPartitionId(),
-
bucketId)))
- .collect(Collectors.toList());
- } else {
- tableBuckets =
- IntStream.range(0, bucketCount)
- .mapToObj(
- bucketId ->
- new
TableBucket(tableInfo.getTableId(), bucketId))
- .collect(Collectors.toList());
- }
-
- Scan scan = table.newScan().limit(limit).project(projectedFields);
- List<BatchScanner> scanners =
- tableBuckets.stream()
- .map(scan::createBatchScanner)
- .collect(Collectors.toList());
- List<InternalRow> scannedRows =
BatchScanUtils.collectLimitedRows(scanners, limit);
-
+ BatchScanner batchScanner =
+ table.newScan()
+ .project(projectedFields)
+ .limit(limit)
+ .createBatchScanner()) {
+ List<InternalRow> scannedRows = collectRows(batchScanner);
// convert fluss row into flink row
List<RowData> flinkRows = new ArrayList<>();
FlussRowToFlinkRowConverter flussRowToFlinkRowConverter =