This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 199af0bf7 [client] Support Scan#createBatchScanner() API to read data 
from the whole table (#2794)
199af0bf7 is described below

commit 199af0bf78c3b42347cf9d4c1d78967bfe3dd42d
Author: Hongshun Wang <[email protected]>
AuthorDate: Wed Mar 11 00:04:39 2026 +0800

    [client] Support Scan#createBatchScanner() API to read data from the whole 
table (#2794)
---
 .../apache/fluss/client/table/scanner/Scan.java    |   4 +
 .../fluss/client/table/scanner/TableScan.java      |  43 ++++-
 .../client/table/scanner/batch/BatchScanUtils.java |   2 +-
 .../table/scanner/batch/CompositeBatchScanner.java |  95 +++++++++++
 ...hScannerITCase.java => BatchScannerITCase.java} |  22 ++-
 .../scanner/batch/CompositeBatchScannerTest.java   | 189 +++++++++++++++++++++
 .../apache/fluss/flink/utils/PushdownUtils.java    |  46 +----
 7 files changed, 358 insertions(+), 43 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
index 1c05bd391..2b7a5e732 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/Scan.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableBucket;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -88,4 +89,7 @@ public interface Scan {
      * #limit(int)} and only support for Primary Key Tables.
      */
     BatchScanner createBatchScanner(TableBucket tableBucket, long snapshotId);
+
+    /** Creates a {@link BatchScanner} to read current data in the given table 
for this scan. */
+    BatchScanner createBatchScanner() throws IOException;
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
index 0d1d28a0e..640f2541a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java
@@ -21,6 +21,7 @@ import org.apache.fluss.client.FlussConnection;
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.metadata.KvSnapshotMetadata;
 import org.apache.fluss.client.table.scanner.batch.BatchScanner;
+import org.apache.fluss.client.table.scanner.batch.CompositeBatchScanner;
 import org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScanner;
 import org.apache.fluss.client.table.scanner.batch.LimitBatchScanner;
 import org.apache.fluss.client.table.scanner.log.LogScanner;
@@ -29,6 +30,7 @@ import 
org.apache.fluss.client.table.scanner.log.TypedLogScanner;
 import org.apache.fluss.client.table.scanner.log.TypedLogScannerImpl;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.SchemaGetter;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
@@ -36,11 +38,13 @@ import org.apache.fluss.types.RowType;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 /** API for configuring and creating {@link LogScanner} and {@link 
BatchScanner}. */
 public class TableScan implements Scan {
-
     private final FlussConnection conn;
     private final TableInfo tableInfo;
     private final SchemaGetter schemaGetter;
@@ -171,4 +175,41 @@ public class TableScan implements Scan {
                 tableInfo.getTableConfig().getKvFormat(),
                 conn.getOrCreateRemoteFileDownloader());
     }
+
+    @Override
+    public BatchScanner createBatchScanner() throws IOException {
+        int bucketCount = tableInfo.getNumBuckets();
+        List<TableBucket> tableBuckets;
+        if (tableInfo.isPartitioned()) {
+            List<PartitionInfo> partitionInfos;
+            try {
+                partitionInfos = 
conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get();
+            } catch (Exception e) {
+                throw new IOException(
+                        "Failed to list partition infos for table" + 
tableInfo.getTablePath(), e);
+            }
+            tableBuckets =
+                    partitionInfos.stream()
+                            .flatMap(
+                                    partitionInfo ->
+                                            IntStream.range(0, bucketCount)
+                                                    .mapToObj(
+                                                            bucketId ->
+                                                                    new 
TableBucket(
+                                                                            
tableInfo.getTableId(),
+                                                                            
partitionInfo
+                                                                               
     .getPartitionId(),
+                                                                            
bucketId)))
+                            .collect(Collectors.toList());
+        } else {
+            tableBuckets =
+                    IntStream.range(0, bucketCount)
+                            .mapToObj(bucketId -> new 
TableBucket(tableInfo.getTableId(), bucketId))
+                            .collect(Collectors.toList());
+        }
+
+        List<BatchScanner> scanners =
+                
tableBuckets.stream().map(this::createBatchScanner).collect(Collectors.toList());
+        return new CompositeBatchScanner(scanners, limit);
+    }
 }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
index d36bf9a7d..aaf1b9711 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/BatchScanUtils.java
@@ -89,7 +89,7 @@ public class BatchScanUtils {
                 throw new RuntimeException("Failed to close scanner", e);
             }
         }
-        return rows;
+        return rows.size() > limit ? rows.subList(0, limit) : rows;
     }
 
     /**
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java
new file mode 100644
index 000000000..2026e5438
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScanner.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static 
org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectLimitedRows;
+
+/**
+ * A {@link BatchScanner} that combines multiple {@link BatchScanner} 
instances into a single
+ * scanner. It polls the underlying scanners in a round-robin fashion: each 
{@link
+ * #pollBatch(Duration)} call is delegated to the next scanner in the queue, 
and scanners that still
+ * have data are re-enqueued while exhausted scanners are closed and removed.
+ *
+ * <p>When a {@code limit} is specified, rows are collected eagerly across all 
underlying scanners
+ * up to that limit and returned in a single batch.
+ */
+@Internal
+public class CompositeBatchScanner implements BatchScanner {
+
+    /** Queue of underlying scanners to be polled in order. */
+    private final LinkedList<BatchScanner> scannerQueue;
+
+    /** Optional row limit; when set, rows are collected eagerly up to this 
count. */
+    private final @Nullable Integer limit;
+
+    public CompositeBatchScanner(List<BatchScanner> scanners, @Nullable 
Integer limit) {
+        this.scannerQueue = new LinkedList<>(scanners);
+        this.limit = limit;
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Ensure all scanners are closed on failure to avoid resource leaks
+        scannerQueue.forEach(IOUtils::closeQuietly);
+        scannerQueue.clear();
+    }
+
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+
+        while (!scannerQueue.isEmpty()) {
+            // Direct return limit scan which don't have so much data.
+            if (limit != null) {
+                CloseableIterator<InternalRow> iterator =
+                        
CloseableIterator.wrap(collectLimitedRows(scannerQueue, limit).iterator());
+                scannerQueue.clear();
+                return iterator;
+            }
+
+            BatchScanner scanner = scannerQueue.poll();
+            try {
+                CloseableIterator<InternalRow> iterator = 
scanner.pollBatch(timeout);
+                if (iterator != null) {
+                    // If the scanner has more data, add it back to the queue
+                    scannerQueue.add(scanner);
+                    return iterator;
+                } else {
+                    // Close the scanner if it has no more data, and not add 
it back to the queue
+                    scanner.close();
+                }
+            } catch (Exception e) {
+                scannerQueue.add(scanner);
+                throw new IOException("Failed to collect rows", e);
+            }
+        }
+        return null;
+    }
+}
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java
similarity index 94%
rename from 
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
rename to 
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java
index 24415f075..67e1616eb 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/BatchScannerITCase.java
@@ -53,12 +53,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
 import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
 import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** IT Case for {@link KvSnapshotBatchScanner}. */
-class KvSnapshotBatchScannerITCase extends ClientToServerITCaseBase {
+/** IT Case for {@link BatchScanner}. */
+class BatchScannerITCase extends ClientToServerITCaseBase {
 
     private static final int DEFAULT_BUCKET_NUM = 3;
 
@@ -324,6 +325,23 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
         table.close();
     }
 
+    @Test
+    void testTableLevelScanRespectsLimit() throws Exception {
+        TablePath tablePath = TablePath.of(DEFAULT_DB, 
"test-table-level-scan-limit");
+        long tableId = createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, true);
+
+        // insert 3 rows per bucket (9 total across 3 buckets)
+        putRows(tableId, tablePath, 9);
+
+        int limit = 5;
+        try (Table table = conn.getTable(tablePath);
+                BatchScanner scanner = 
table.newScan().limit(limit).createBatchScanner()) {
+            List<InternalRow> actual = collectRows(scanner);
+            // collectLimitedRows stops once >= limit rows are collected
+            assertThat(actual.size()).isEqualTo(limit);
+        }
+    }
+
     // -------- Utils method
 
     private static int getBucketId(InternalRow row) {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java
new file mode 100644
index 000000000..129fc028b
--- /dev/null
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/CompositeBatchScannerTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.table.scanner.batch;
+
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompositeBatchScanner}. */
+class CompositeBatchScannerTest {
+
+    private static final Duration TIMEOUT = Duration.ofMillis(10);
+
+    // 
-------------------------------------------------------------------------
+    // No-limit tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testPollBatchWithNoLimit() throws IOException {
+        // Three scanners each holding rows [0], [1], [2].
+        // CompositeBatchScanner should round-robin and eventually return all 
rows.
+        StubBatchScanner s1 = scanner(0);
+        StubBatchScanner s2 = scanner(1);
+        StubBatchScanner s3 = scanner(2);
+
+        CompositeBatchScanner composite =
+                new CompositeBatchScanner(Arrays.asList(s1, s2, s3), null);
+
+        List<InternalRow> collected = collectAll(composite);
+
+        assertThat(collected).hasSize(3);
+        assertThat(intValues(collected)).containsExactlyInAnyOrder(0, 1, 2);
+        assertThat(s1.closed).isTrue();
+        assertThat(s2.closed).isTrue();
+        assertThat(s3.closed).isTrue();
+    }
+
+    @Test
+    void testPollBatchSkipsExhaustedScanner() throws IOException {
+        // s1 is already exhausted (returns null immediately), s2 has data.
+        StubBatchScanner s1 = scanner(); // no rows → immediately returns null
+        StubBatchScanner s2 = scanner(99);
+
+        CompositeBatchScanner composite = new 
CompositeBatchScanner(Arrays.asList(s1, s2), null);
+
+        List<InternalRow> collected = collectAll(composite);
+
+        assertThat(intValues(collected)).containsExactly(99);
+        assertThat(s1.closed).isTrue();
+        assertThat(s2.closed).isTrue();
+    }
+
+    @Test
+    void testPollBatchWithLimit() throws IOException {
+        // Two scanners with 3 rows each (one row per batch), limit = 3.
+        // collectLimitedRows collects until rows.size() >= limit.
+        StubBatchScanner s1 = scanner(1, 2, 3);
+        StubBatchScanner s2 = scanner(4, 5, 6);
+
+        CompositeBatchScanner composite = new 
CompositeBatchScanner(Arrays.asList(s1, s2), 3);
+
+        CloseableIterator<InternalRow> batch = composite.pollBatch(TIMEOUT);
+        assertThat(batch).isNotNull();
+
+        List<Integer> values = new ArrayList<>();
+        while (batch.hasNext()) {
+            values.add(batch.next().getInt(0));
+        }
+        assertThat(values.size()).isEqualTo(3);
+    }
+
+    @Test
+    void testCloseAllRemainingScannersInQueue() throws IOException {
+        StubBatchScanner s1 = scanner(1, 2);
+        StubBatchScanner s2 = scanner(3, 4);
+
+        CompositeBatchScanner composite = new 
CompositeBatchScanner(Arrays.asList(s1, s2), null);
+
+        // Poll one batch so s1 is re-enqueued and s2 stays in queue.
+        composite.pollBatch(TIMEOUT);
+        composite.close();
+
+        assertThat(s1.closed).isTrue();
+        assertThat(s2.closed).isTrue();
+    }
+
+    @Test
+    void testCloseOnEmptyQueueDoesNotThrow() throws IOException {
+        CompositeBatchScanner composite = new 
CompositeBatchScanner(Collections.emptyList(), null);
+        composite.close(); // should not throw
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Helpers
+    // 
-------------------------------------------------------------------------
+    /** Creates a {@link StubBatchScanner} that returns one row per {@code 
pollBatch} call. */
+    private static StubBatchScanner scanner(int... values) {
+        return new StubBatchScanner(values);
+    }
+
+    /** Drains the composite scanner and collects all rows. */
+    private static List<InternalRow> collectAll(CompositeBatchScanner 
composite)
+            throws IOException {
+        List<InternalRow> rows = new ArrayList<>();
+        CloseableIterator<InternalRow> batch;
+        while ((batch = composite.pollBatch(TIMEOUT)) != null) {
+            while (batch.hasNext()) {
+                rows.add(batch.next());
+            }
+            batch.close();
+        }
+        return rows;
+    }
+
+    private static List<Integer> intValues(List<InternalRow> rows) {
+        List<Integer> values = new ArrayList<>();
+        for (InternalRow row : rows) {
+            values.add(row.getInt(0));
+        }
+        return values;
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Stub
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * A stub {@link BatchScanner} that returns one row per {@link #pollBatch} 
call from a
+     * pre-defined value list, then returns {@code null} once exhausted. 
Tracks whether it was
+     * closed.
+     */
+    private static class StubBatchScanner implements BatchScanner {
+
+        private final Queue<Integer> values;
+        boolean closed = false;
+
+        StubBatchScanner(int[] values) {
+            this.values = new LinkedList<>();
+            for (int v : values) {
+                this.values.add(v);
+            }
+        }
+
+        @Nullable
+        @Override
+        public CloseableIterator<InternalRow> pollBatch(Duration timeout) {
+            if (values.isEmpty()) {
+                return null;
+            }
+            GenericRow row = GenericRow.of(values.poll());
+            return 
CloseableIterator.wrap(Collections.<InternalRow>singletonList(row).iterator());
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
index 455d1f365..1afae9516 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
@@ -23,8 +23,6 @@ import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.admin.ListOffsetsResult;
 import org.apache.fluss.client.admin.OffsetSpec;
 import org.apache.fluss.client.table.Table;
-import org.apache.fluss.client.table.scanner.Scan;
-import org.apache.fluss.client.table.scanner.batch.BatchScanUtils;
 import org.apache.fluss.client.table.scanner.batch.BatchScanner;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.Configuration;
@@ -33,7 +31,6 @@ import org.apache.fluss.exception.UnsupportedVersionException;
 import org.apache.fluss.flink.source.lookup.FlinkLookupFunction;
 import org.apache.fluss.flink.source.lookup.LookupNormalizer;
 import org.apache.fluss.metadata.PartitionInfo;
-import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metadata.TableStats;
@@ -75,6 +72,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
 import static 
org.apache.fluss.flink.source.lookup.LookupNormalizer.createPrimaryKeyLookupNormalizer;
 import static org.apache.fluss.utils.ExceptionUtils.findThrowable;
 
@@ -308,42 +306,12 @@ public class PushdownUtils {
         int limit = (int) limitRowNum;
         try (Connection connection = 
ConnectionFactory.createConnection(flussConfig);
                 Table table = connection.getTable(tablePath);
-                Admin flussAdmin = connection.getAdmin()) {
-            TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
-            int bucketCount = tableInfo.getNumBuckets();
-            List<TableBucket> tableBuckets;
-            if (tableInfo.isPartitioned()) {
-                List<PartitionInfo> partitionInfos = 
flussAdmin.listPartitionInfos(tablePath).get();
-                tableBuckets =
-                        partitionInfos.stream()
-                                .flatMap(
-                                        partitionInfo ->
-                                                IntStream.range(0, bucketCount)
-                                                        .mapToObj(
-                                                                bucketId ->
-                                                                        new 
TableBucket(
-                                                                               
 tableInfo
-                                                                               
         .getTableId(),
-                                                                               
 partitionInfo
-                                                                               
         .getPartitionId(),
-                                                                               
 bucketId)))
-                                .collect(Collectors.toList());
-            } else {
-                tableBuckets =
-                        IntStream.range(0, bucketCount)
-                                .mapToObj(
-                                        bucketId ->
-                                                new 
TableBucket(tableInfo.getTableId(), bucketId))
-                                .collect(Collectors.toList());
-            }
-
-            Scan scan = table.newScan().limit(limit).project(projectedFields);
-            List<BatchScanner> scanners =
-                    tableBuckets.stream()
-                            .map(scan::createBatchScanner)
-                            .collect(Collectors.toList());
-            List<InternalRow> scannedRows = 
BatchScanUtils.collectLimitedRows(scanners, limit);
-
+                BatchScanner batchScanner =
+                        table.newScan()
+                                .project(projectedFields)
+                                .limit(limit)
+                                .createBatchScanner()) {
+            List<InternalRow> scannedRows = collectRows(batchScanner);
             // convert fluss row into flink row
             List<RowData> flinkRows = new ArrayList<>();
             FlussRowToFlinkRowConverter flussRowToFlinkRowConverter =

Reply via email to