This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new f89bcb38 [FLINK-27307] Add test for sequential read within bucket for 
append-only table
f89bcb38 is described below

commit f89bcb386d410c8cc3edb771c74a27320be059c2
Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
AuthorDate: Mon Aug 1 15:18:23 2022 +0800

    [FLINK-27307] Add test for sequential read within bucket for append-only 
table
    
    This closes #256
---
 .../flink/table/store/table/source/TableScan.java  |  6 ++
 .../store/table/AppendOnlyFileStoreTableTest.java  | 64 ++++++++++++++++++++++
 .../table/store/table/FileStoreTableTestBase.java  |  6 +-
 3 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index 5a9fc05b..b7417dfc 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -97,6 +97,12 @@ public abstract class TableScan {
         return this;
     }
 
+    @VisibleForTesting
+    public TableScan withBucket(int bucket) {
+        scan.withBucket(bucket);
+        return this;
+    }
+
     public Plan plan() {
         FileStoreScan.Plan plan = scan.plan();
         return new Plan(plan.snapshotId(), 
generateSplits(plan.groupByPartFiles()));
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index 32ef5e7a..9cc7ed32 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -20,6 +20,9 @@ package org.apache.flink.table.store.table;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.WriteMode;
 import org.apache.flink.table.store.file.predicate.Predicate;
@@ -34,11 +37,18 @@ import org.apache.flink.table.store.table.source.TableRead;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.store.table.sink.SinkRecordConverter.bucket;
+import static 
org.apache.flink.table.store.table.sink.SinkRecordConverter.hashcode;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link AppendOnlyFileStoreTable}. */
@@ -135,6 +145,60 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
         assertThat(getResult(read, splits, binaryRow(2), 0, 
STREAMING_ROW_TO_STRING)).isEmpty();
     }
 
+    @Test
+    public void testSequentialRead() throws Exception {
+        Random random = new Random();
+        int numOfBucket = Math.max(random.nextInt(8), 1);
+        FileStoreTable table = createFileStoreTable(numOfBucket);
+        RowDataSerializer serializer = new 
RowDataSerializer(table.schema().logicalRowType());
+        TableWrite write = table.newWrite();
+
+        TableCommit commit = table.newCommit("user");
+        List<Map<Integer, List<RowData>>> dataset = new ArrayList<>();
+        Map<Integer, List<RowData>> dataPerBucket = new HashMap<>(numOfBucket);
+        int numOfPartition = Math.max(random.nextInt(10), 1);
+        for (int i = 0; i < numOfPartition; i++) {
+            for (int j = 0; j < Math.max(random.nextInt(200), 1); j++) {
+                BinaryRowData data =
+                        serializer
+                                .toBinaryRow(
+                                        GenericRowData.of(i, random.nextInt(), 
random.nextLong()))
+                                .copy();
+                int bucket = bucket(hashcode(data), numOfBucket);
+                dataPerBucket.compute(
+                        bucket,
+                        (k, v) -> {
+                            if (v == null) {
+                                v = new ArrayList<>();
+                            }
+                            v.add(data);
+                            return v;
+                        });
+                write.write(data);
+            }
+            dataset.add(new HashMap<>(dataPerBucket));
+            dataPerBucket.clear();
+        }
+        commit.commit("0", write.prepareCommit(true));
+
+        int partition = random.nextInt(numOfPartition);
+        List<Integer> availableBucket = new 
ArrayList<>(dataset.get(partition).keySet());
+        int bucket = 
availableBucket.get(random.nextInt(availableBucket.size()));
+
+        Predicate partitionFilter =
+                new PredicateBuilder(table.schema().logicalRowType()).equal(0, 
partition);
+        List<Split> splits =
+                
table.newScan().withFilter(partitionFilter).withBucket(bucket).plan().splits;
+        TableRead read = table.newRead();
+        getResult(read, splits, binaryRow(partition), bucket, 
STREAMING_ROW_TO_STRING);
+
+        assertThat(getResult(read, splits, binaryRow(partition), bucket, 
STREAMING_ROW_TO_STRING))
+                .containsExactlyElementsOf(
+                        dataset.get(partition).get(bucket).stream()
+                                .map(STREAMING_ROW_TO_STRING)
+                                .collect(Collectors.toList()));
+    }
+
     private void writeData() throws Exception {
         FileStoreTable table = createFileStoreTable();
         TableWrite write = table.newWrite();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index a091c12c..d54f6703 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -215,8 +215,12 @@ public abstract class FileStoreTableTestBase {
         return b;
     }
 
+    protected FileStoreTable createFileStoreTable(int numOfBucket) throws 
Exception {
+        return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket));
+    }
+
     protected FileStoreTable createFileStoreTable() throws Exception {
-        return createFileStoreTable(conf -> {});
+        return createFileStoreTable(1);
     }
 
     protected abstract FileStoreTable 
createFileStoreTable(Consumer<Configuration> configure)

Reply via email to