This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 907747d3 [FLINK-28017] Introduce bucket-key to table store 907747d3 is described below commit 907747d3e37b7edb5c1bec5209e47bd4ff9f737b Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Tue Jul 5 15:47:44 2022 +0800 [FLINK-28017] Introduce bucket-key to table store This closes #193 --- .../connector/sink/BucketStreamPartitioner.java | 3 +- .../org/apache/flink/table/store/CoreOptions.java | 17 ++++ .../flink/table/store/file/schema/TableSchema.java | 34 +++++++- .../store/table/sink/SinkRecordConverter.java | 29 +++++-- .../store/table/sink/SinkRecordConverterTest.java | 92 ++++++++++++++++++++++ 5 files changed, 167 insertions(+), 8 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java index 774d2a1d..cd640c43 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/BucketStreamPartitioner.java @@ -48,8 +48,7 @@ public class BucketStreamPartitioner extends StreamPartitioner<RowData> { @Override public int selectChannel(SerializationDelegate<StreamRecord<RowData>> record) { RowData row = record.getInstance().getValue(); - int bucket = recordConverter.bucket(row, recordConverter.primaryKey(row)); - return bucket % numberOfChannels; + return recordConverter.bucket(row) % numberOfChannels; } @Override diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java index 88a32d80..afc3c498 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java @@ -52,6 +52,23 @@ public class CoreOptions implements Serializable { .defaultValue(1) .withDescription("Bucket number for file store."); + public static final ConfigOption<String> BUCKET_KEY = + ConfigOptions.key("bucket-key") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Specify the table store distribution policy. Data is assigned" + + " to each bucket according to the hash value of bucket-key.") + .linebreak() + .text("If you specify multiple fields, delimiter is ','.") + .linebreak() + .text( + "If not specified, the primary key will be used; " + + "if there is no primary key, the full row will be used.") + .build()); + @Internal public static final ConfigOption<String> PATH = ConfigOptions.key("path") diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java index 89d7ce03..2a6c6b34 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java @@ -25,9 +25,11 @@ import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -37,6 +39,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY; + /** Schema of a table. */ public class TableSchema implements Serializable { @@ -116,7 +120,8 @@ public class TableSchema implements Serializable { Preconditions.checkState( adjusted.size() > 0, String.format( - "Primary key constraint %s should not be same with partition fields %s, this will result in only one record in a partition", + "Primary key constraint %s should not be same with partition fields %s," + + " this will result in only one record in a partition", primaryKeys, partitionKeys)); return adjusted; @@ -129,6 +134,33 @@ public class TableSchema implements Serializable { return options; } + public List<String> bucketKeys() { + String key = options.get(BUCKET_KEY.key()); + if (StringUtils.isNullOrWhitespaceOnly(key)) { + return Collections.emptyList(); + } + List<String> bucketKeys = Arrays.asList(key.split(",")); + if (!containsAll(fieldNames(), bucketKeys)) { + throw new RuntimeException( + String.format( + "Field names %s should contains all bucket keys %s.", + fieldNames(), bucketKeys)); + } + if (primaryKeys.size() > 0) { + if (!containsAll(primaryKeys, bucketKeys)) { + throw new RuntimeException( + String.format( + "Primary keys %s should contains all bucket keys %s.", + primaryKeys, bucketKeys)); + } + } + return bucketKeys; + } + + private boolean containsAll(List<String> all, List<String> contains) { + return new HashSet<>(all).containsAll(new HashSet<>(contains)); + } + public String comment() { return comment; } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java index 40b99afb..9d926ba5 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java @@ -40,6 +40,8 @@ public class SinkRecordConverter { private final Projection<RowData, BinaryRowData> partProjection; + private final Projection<RowData, BinaryRowData> bucketProjection; + private final Projection<RowData, BinaryRowData> pkProjection; @Nullable private final Projection<RowData, BinaryRowData> logPkProjection; @@ -49,14 +51,16 @@ public class SinkRecordConverter { numBucket, tableSchema.logicalRowType(), tableSchema.projection(tableSchema.partitionKeys()), + tableSchema.projection(tableSchema.bucketKeys()), tableSchema.projection(tableSchema.trimmedPrimaryKeys()), tableSchema.projection(tableSchema.primaryKeys())); } - public SinkRecordConverter( + private SinkRecordConverter( int numBucket, RowType inputType, int[] partitions, + int[] bucketKeys, int[] primaryKeys, int[] logPrimaryKeys) { this.numBucket = numBucket; @@ -64,6 +68,7 @@ public class SinkRecordConverter { CodeGenUtils.newProjection( inputType, IntStream.range(0, inputType.getFieldCount()).toArray()); this.partProjection = CodeGenUtils.newProjection(inputType, partitions); + this.bucketProjection = CodeGenUtils.newProjection(inputType, bucketKeys); this.pkProjection = CodeGenUtils.newProjection(inputType, primaryKeys); this.logPkProjection = Arrays.equals(primaryKeys, logPrimaryKeys) @@ -74,7 +79,7 @@ public class SinkRecordConverter { public SinkRecord convert(RowData row) { BinaryRowData partition = partProjection.apply(row); BinaryRowData primaryKey = primaryKey(row); - int bucket = bucket(row, primaryKey); + int bucket = bucket(row, bucketKey(row, primaryKey)); return new SinkRecord(partition, bucket, primaryKey, row); } @@ -86,17 +91,31 @@ public class SinkRecordConverter { return new SinkRecord(record.partition(), record.bucket(), logPrimaryKey, record.row()); } - public BinaryRowData primaryKey(RowData row) { + public int bucket(RowData row) { + return bucket(row, bucketKey(row)); + } + + private BinaryRowData primaryKey(RowData row) { return pkProjection.apply(row); } + private BinaryRowData bucketKey(RowData row) { + BinaryRowData bucketKey = bucketProjection.apply(row); + return bucketKey.getArity() == 0 ? pkProjection.apply(row) : bucketKey; + } + + private BinaryRowData bucketKey(RowData row, BinaryRowData primaryKey) { + BinaryRowData bucketKey = bucketProjection.apply(row); + return bucketKey.getArity() == 0 ? primaryKey : bucketKey; + } + private BinaryRowData logPrimaryKey(RowData row) { assert logPkProjection != null; return logPkProjection.apply(row); } - public int bucket(RowData row, BinaryRowData primaryKey) { - int hash = primaryKey.getArity() == 0 ? hashRow(row) : primaryKey.hashCode(); + private int bucket(RowData row, BinaryRowData bucketKey) { + int hash = bucketKey.getArity() == 0 ? hashRow(row) : bucketKey.hashCode(); return Math.abs(hash % numBucket); } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java new file mode 100644 index 00000000..eea054d4 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.table.sink; + +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.schema.DataField; +import org.apache.flink.table.store.file.schema.TableSchema; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link SinkRecordConverter}. */ +public class SinkRecordConverterTest { + + @Test + public void testInvalidBucket() { + assertThatThrownBy(() -> converter("n", "b")) + .hasMessageContaining("Field names [a, b, c] should contains all bucket keys [n]."); + + assertThatThrownBy(() -> converter("a", "b")) + .hasMessageContaining("Primary keys [b] should contains all bucket keys [a]."); + } + + @Test + public void testBucket() { + GenericRowData row = GenericRowData.of(5, 6, 7); + assertThat(bucket(converter("a", "a,b"), row)).isEqualTo(96); + assertThat(bucket(converter("", "a"), row)).isEqualTo(96); + assertThat(bucket(converter("", "a,b"), row)).isEqualTo(27); + assertThat(bucket(converter("a,b", "a,b"), row)).isEqualTo(27); + assertThat(bucket(converter("", ""), row)).isEqualTo(40); + assertThat(bucket(converter("a,b,c", ""), row)).isEqualTo(40); + assertThat(bucket(converter("", "a,b,c"), row)).isEqualTo(40); + } + + private int bucket(SinkRecordConverter converter, RowData row) { + int bucket1 = converter.bucket(row); + int bucket2 = converter.convert(row).bucket(); + assertThat(bucket1).isEqualTo(bucket2); + return bucket1; + } + + private SinkRecordConverter converter(String bk, String pk) { + RowType rowType = + new RowType( + Arrays.asList( + new RowType.RowField("a", new IntType()), + new RowType.RowField("b", new IntType()), + new RowType.RowField("c", new IntType()))); + List<DataField> fields = TableSchema.newFields(rowType); + Map<String, String> options = new HashMap<>(); + options.put(BUCKET_KEY.key(), bk); + TableSchema schema = + new TableSchema( + 0, + fields, + TableSchema.currentHighestFieldId(fields), + Collections.emptyList(), + "".equals(pk) ? Collections.emptyList() : Arrays.asList(pk.split(",")), + options, + ""); + return new SinkRecordConverter(100, schema); + } +}