This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new f89bcb38 [FLINK-27307] Add test for sequential read within bucket for append-only table f89bcb38 is described below commit f89bcb386d410c8cc3edb771c74a27320be059c2 Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com> AuthorDate: Mon Aug 1 15:18:23 2022 +0800 [FLINK-27307] Add test for sequential read within bucket for append-only table This closes #256 --- .../flink/table/store/table/source/TableScan.java | 6 ++ .../store/table/AppendOnlyFileStoreTableTest.java | 64 ++++++++++++++++++++++ .../table/store/table/FileStoreTableTestBase.java | 6 +- 3 files changed, 75 insertions(+), 1 deletion(-) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java index 5a9fc05b..b7417dfc 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java @@ -97,6 +97,12 @@ public abstract class TableScan { return this; } + @VisibleForTesting + public TableScan withBucket(int bucket) { + scan.withBucket(bucket); + return this; + } + public Plan plan() { FileStoreScan.Plan plan = scan.plan(); return new Plan(plan.snapshotId(), generateSplits(plan.groupByPartFiles())); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java index 32ef5e7a..9cc7ed32 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java @@ -20,6 +20,9 @@ package org.apache.flink.table.store.table; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.WriteMode; import org.apache.flink.table.store.file.predicate.Predicate; @@ -34,11 +37,18 @@ import org.apache.flink.table.store.table.source.TableRead; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.function.Consumer; +import java.util.stream.Collectors; +import static org.apache.flink.table.store.table.sink.SinkRecordConverter.bucket; +import static org.apache.flink.table.store.table.sink.SinkRecordConverter.hashcode; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link AppendOnlyFileStoreTable}. */ @@ -135,6 +145,60 @@ public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase { assertThat(getResult(read, splits, binaryRow(2), 0, STREAMING_ROW_TO_STRING)).isEmpty(); } + @Test + public void testSequentialRead() throws Exception { + Random random = new Random(); + int numOfBucket = Math.max(random.nextInt(8), 1); + FileStoreTable table = createFileStoreTable(numOfBucket); + RowDataSerializer serializer = new RowDataSerializer(table.schema().logicalRowType()); + TableWrite write = table.newWrite(); + + TableCommit commit = table.newCommit("user"); + List<Map<Integer, List<RowData>>> dataset = new ArrayList<>(); + Map<Integer, List<RowData>> dataPerBucket = new HashMap<>(numOfBucket); + int numOfPartition = Math.max(random.nextInt(10), 1); + for (int i = 0; i < numOfPartition; i++) { + for (int j = 0; j < Math.max(random.nextInt(200), 1); j++) { + BinaryRowData data = + serializer + .toBinaryRow( + GenericRowData.of(i, random.nextInt(), random.nextLong())) + .copy(); + int bucket = bucket(hashcode(data), numOfBucket); + dataPerBucket.compute( + bucket, + (k, v) -> { + if (v == null) { + v = new ArrayList<>(); + } + v.add(data); + return v; + }); + write.write(data); + } + dataset.add(new HashMap<>(dataPerBucket)); + dataPerBucket.clear(); + } + commit.commit("0", write.prepareCommit(true)); + + int partition = random.nextInt(numOfPartition); + List<Integer> availableBucket = new ArrayList<>(dataset.get(partition).keySet()); + int bucket = availableBucket.get(random.nextInt(availableBucket.size())); + + Predicate partitionFilter = + new PredicateBuilder(table.schema().logicalRowType()).equal(0, partition); + List<Split> splits = + table.newScan().withFilter(partitionFilter).withBucket(bucket).plan().splits; + TableRead read = table.newRead(); + getResult(read, splits, binaryRow(partition), bucket, STREAMING_ROW_TO_STRING); + + assertThat(getResult(read, splits, binaryRow(partition), bucket, STREAMING_ROW_TO_STRING)) + .containsExactlyElementsOf( + dataset.get(partition).get(bucket).stream() + .map(STREAMING_ROW_TO_STRING) + .collect(Collectors.toList())); + } + private void writeData() throws Exception { FileStoreTable table = createFileStoreTable(); TableWrite write = table.newWrite(); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java index a091c12c..d54f6703 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java @@ -215,8 +215,12 @@ public abstract class FileStoreTableTestBase { return b; } + protected FileStoreTable createFileStoreTable(int numOfBucket) throws Exception { + return createFileStoreTable(conf -> conf.set(BUCKET, numOfBucket)); + } + protected FileStoreTable createFileStoreTable() throws Exception { - return createFileStoreTable(conf -> {}); + return createFileStoreTable(1); } protected abstract FileStoreTable createFileStoreTable(Consumer<Configuration> configure)