This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7cd6547a9027dfdc7ea97e496bb0e15213150529 Author: Yubin Li <lixin58...@163.com> AuthorDate: Tue Nov 7 14:13:22 2023 +0800 [FLINK-32993][table] DataGen connector handles length-constrained fields according to the schema definition by default This closes #23678 --- docs/content.zh/docs/connectors/table/datagen.md | 14 +- docs/content/docs/connectors/table/datagen.md | 18 ++- .../datagen/table/DataGenTableSourceFactory.java | 43 +++++- .../datagen/table/RandomGeneratorVisitor.java | 91 ++++-------- .../factories/DataGenTableSourceFactoryTest.java | 155 +++++++++++++++++---- 5 files changed, 223 insertions(+), 98 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/datagen.md b/docs/content.zh/docs/connectors/table/datagen.md index 210ccc088d2..642382a7768 100644 --- a/docs/content.zh/docs/connectors/table/datagen.md +++ b/docs/content.zh/docs/connectors/table/datagen.md @@ -39,9 +39,14 @@ DataGen 连接器是内置的,不需要额外的依赖项。 ----- 默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。 -对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。 还可以指定总行数,从而生成有界表。 +DataGen 连接器可以生成符合其 schema 的数据,应该注意的是,它按如下方式处理长度受限的字段: + +* 对于固定长度的数据类型(char、binary),字段长度只能由 schema 定义,且不支持自定义; +* 对于可变长度数据类型 (varchar、varbinary),字段默认长度由 schema 定义,且自定义长度不能大于 schema 定义; +* 对于超长字段(string、bytes),字段默认长度为 100,但可以定义为小于 2^31 的长度。 + 还支持序列生成器,您可以指定序列的起始和结束值。 如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。 @@ -294,7 +299,12 @@ CREATE TABLE Orders ( <td>可选</td> <td style="word-wrap: break-word;">100</td> <td>Integer</td> - <td>随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string、array、map、multiset。</td> + <td> + 随机生成器生成字符的长度,适用于 varchar、varbinary、string、bytes、array、map、multiset。 + 请注意对于可变长字段(varchar、varbinary),默认长度由 schema 定义,且长度不可设置为大于它; + 对于超长字段(string、bytes),默认长度是 100 且可设置为小于 2^31 的长度; + 对于结构化字段(数组、Map、多重集),默认元素数量为 3 且可以自定义。 + </td> </tr> <tr> <td><h5>fields.#.var-len</h5></td> diff --git a/docs/content/docs/connectors/table/datagen.md b/docs/content/docs/connectors/table/datagen.md index b4fc81a4280..70253786bff 100644 --- a/docs/content/docs/connectors/table/datagen.md +++ b/docs/content/docs/connectors/table/datagen.md @@ -39,9 +39,16 @@ Usage ----- By default, a DataGen table will create an unbounded number of rows with a random value for each column. -For types, char/varchar/binary/varbinary/string/array/map/multiset, the length can be specified. Additionally, a total number of rows can be specified, resulting in a bounded table. +The DataGen connector can generate data that conforms to its defined schema, It should be noted that it handles length-constrained fields as follows: + +* For fixed-length data types (char/binary), the field length can only be defined by the schema, +and does not support customization. +* For variable-length data types (varchar/varbinary), the field length is initially defined by the schema, +and the customized length cannot be greater than the schema definition. +* For super-long fields (string/bytes), the default length is 100, but can be set to a length less than 2^31. + There also exists a sequence generator, where users specify a sequence of start and end values. If any column in a table is a sequence type, the table will be bounded and end with the first sequence completes. @@ -77,7 +84,7 @@ WITH ( LIKE Orders (EXCLUDING ALL) ``` -Further more, for variable sized types, varchar/string/varbinary/bytes, you can specify whether to enable variable-length data generation. +Furthermore, for variable sized types, varchar/string/varbinary/bytes, you can specify whether to enable variable-length data generation. ```sql CREATE TABLE Orders ( @@ -296,7 +303,12 @@ Connector Options <td>optional</td> <td style="word-wrap: break-word;">100</td> <td>Integer</td> - <td>Size or length of the collection for generating char/varchar/binary/varbinary/string/array/map/multiset types.</td> + <td> + Size or length of the collection for generating varchar/varbinary/string/bytes/array/map/multiset types. + Please notice that for variable-length fields (varchar/varbinary), the default length is defined by the schema and cannot be set to a length greater than it. + for super-long fields (string/bytes), the default length is 100 and can be set to a length less than 2^31. + for constructed fields (array/map/multiset), the default number of elements is 3 and can be customized. + </td> </tr> <tr> <td><h5>fields.#.var-len</h5></td> diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java index 0cb29d61990..72398c34b96 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java @@ -28,6 +28,8 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.table.types.logical.VarCharType; @@ -141,7 +143,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory { } private void validateFieldOptions(String name, DataType type, ReadableConfig options) { - ConfigOption<Boolean> lenOption = + ConfigOption<Boolean> varLenOption = key(DataGenConnectorOptionsUtil.FIELDS + "." + name @@ -149,7 +151,7 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory { + DataGenConnectorOptionsUtil.VAR_LEN) .booleanType() .defaultValue(false); - options.getOptional(lenOption) + options.getOptional(varLenOption) .filter(option -> option) .ifPresent( option -> { @@ -158,10 +160,43 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory { || logicalType instanceof VarBinaryType)) { throw new ValidationException( String.format( - "Only supports specifying '%s' option for variable-length types (varchar, string, varbinary, bytes). The type of field %s is not within this range.", - "fields.#." + DataGenConnectorOptionsUtil.VAR_LEN, + "Only supports specifying '%s' option for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field '%s' is not within this range.", + DataGenConnectorOptions.FIELD_VAR_LEN.key(), name)); + } + }); + + ConfigOption<Integer> lenOption = + key(DataGenConnectorOptionsUtil.FIELDS + + "." + + name + + "." + + DataGenConnectorOptionsUtil.LENGTH) + .intType() + .noDefaultValue(); + options.getOptional(lenOption) + .ifPresent( + option -> { + LogicalType logicalType = type.getLogicalType(); + if (logicalType instanceof CharType + || logicalType instanceof BinaryType) { + throw new ValidationException( + String.format( + "Custom length for fixed-length type (CHAR/BINARY) field '%s' is not supported.", name)); } + if (logicalType instanceof VarCharType + || logicalType instanceof VarBinaryType) { + int length = + logicalType instanceof VarCharType + ? ((VarCharType) logicalType).getLength() + : ((VarBinaryType) logicalType).getLength(); + if (option > length) { + throw new ValidationException( + String.format( + "Custom length '%d' for variable-length type (VARCHAR/STRING/VARBINARY/BYTES) field '%s' should be shorter than '%d' defined in the schema.", + option, name, length)); + } + } }); } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java index 896ad000bf4..254278471a7 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.flink.configuration.ConfigOptions.key; @@ -138,36 +139,22 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(CharType charType) { - ConfigOption<Integer> lenOption = - key(DataGenConnectorOptionsUtil.FIELDS - + "." - + name - + "." - + DataGenConnectorOptionsUtil.LENGTH) - .intType() - .defaultValue(RANDOM_STRING_LENGTH_DEFAULT); ConfigOption<Float> nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( - getRandomStringGenerator(config.get(lenOption)).withNullRate(config.get(nr)), - lenOption, - nr); + getRandomStringGenerator(charType.getLength()).withNullRate(config.get(nr)), nr); } @Override public DataGeneratorContainer visit(VarCharType varCharType) { - ConfigOption<Integer> lenOption = - key(DataGenConnectorOptionsUtil.FIELDS - + "." - + name - + "." - + DataGenConnectorOptionsUtil.LENGTH) - .intType() - .defaultValue(RANDOM_STRING_LENGTH_DEFAULT); + ConfigOption<Integer> lenOption = getLengthOption(varCharType::getLength); + int length = + config.get(lenOption) == VarCharType.MAX_LENGTH + ? RANDOM_STRING_LENGTH_DEFAULT + : config.get(lenOption); ConfigOption<Float> nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); ConfigOption<Boolean> varLenOption = varLen.booleanType().defaultValue(false); - return DataGeneratorContainer.of( - getRandomStringGenerator(config.get(lenOption)) + getRandomStringGenerator(length) .withNullRate(config.get(nr)) .withVarLen(config.get(varLenOption)), lenOption, @@ -177,30 +164,19 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(BinaryType binaryType) { - ConfigOption<Integer> lenOption = - key(DataGenConnectorOptionsUtil.FIELDS - + "." - + name - + "." - + DataGenConnectorOptionsUtil.LENGTH) - .intType() - .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT); - return DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), lenOption); + return DataGeneratorContainer.of(getRandomBytesGenerator(binaryType.getLength())); } @Override public DataGeneratorContainer visit(VarBinaryType varBinaryType) { - ConfigOption<Integer> lenOption = - key(DataGenConnectorOptionsUtil.FIELDS - + "." - + name - + "." - + DataGenConnectorOptionsUtil.LENGTH) - .intType() - .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT); + ConfigOption<Integer> lenOption = getLengthOption(varBinaryType::getLength); ConfigOption<Boolean> varLenOption = varLen.booleanType().defaultValue(false); + int length = + config.get(lenOption) == VarBinaryType.MAX_LENGTH + ? RANDOM_BYTES_LENGTH_DEFAULT + : config.get(lenOption); return DataGeneratorContainer.of( - getRandomBytesGenerator(config.get(lenOption)).withVarLen(config.get(varLenOption)), + getRandomBytesGenerator(length).withVarLen(config.get(varLenOption)), lenOption, varLenOption); } @@ -366,14 +342,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(ArrayType arrayType) { - ConfigOption<Integer> lenOption = - key(DataGenConnectorOptionsUtil.FIELDS - + "." - + name - + "." - + DataGenConnectorOptionsUtil.LENGTH) - .intType() - .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT); + ConfigOption<Integer> lenOption = getLengthOption(() -> RANDOM_COLLECTION_LENGTH_DEFAULT); String fieldName = name + "." + "element"; DataGeneratorContainer container = @@ -390,14 +359,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(MultisetType multisetType) { - ConfigOption<Integer> lenOption = - key(DataGenConnectorOptionsUtil.FIELDS - + "." - + name - + "." - + DataGenConnectorOptionsUtil.LENGTH) - .intType() - .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT); + ConfigOption<Integer> lenOption = getLengthOption(() -> RANDOM_COLLECTION_LENGTH_DEFAULT); String fieldName = name + "." + "element"; DataGeneratorContainer container = @@ -420,14 +382,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(MapType mapType) { - ConfigOption<Integer> lenOption = - key(DataGenConnectorOptionsUtil.FIELDS - + "." - + name - + "." - + DataGenConnectorOptionsUtil.LENGTH) - .intType() - .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT); + ConfigOption<Integer> lenOption = getLengthOption(() -> RANDOM_COLLECTION_LENGTH_DEFAULT); String keyName = name + "." + "key"; String valName = name + "." + "value"; @@ -489,6 +444,16 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { throw new ValidationException("Unsupported type: " + logicalType); } + private ConfigOption<Integer> getLengthOption(Supplier<Integer> defaultLengthSupplier) { + return key(String.join( + ".", + DataGenConnectorOptionsUtil.FIELDS, + name, + DataGenConnectorOptionsUtil.LENGTH)) + .intType() + .defaultValue(defaultLengthSupplier.get()); + } + private static RandomGenerator<StringData> getRandomStringGenerator(int length) { return new RandomGenerator<StringData>() { @Override diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java index e137749fcfd..882ec105f01 100644 --- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java +++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java @@ -41,6 +41,8 @@ import org.apache.flink.util.InstantiationUtil; import org.junit.jupiter.api.Test; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -65,6 +67,13 @@ class DataGenTableSourceFactoryTest { Column.physical("f5", DataTypes.VARBINARY(4)), Column.physical("f6", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING())), Column.physical("f7", DataTypes.STRING())); + private static final ResolvedSchema LENGTH_CONSTRAINED_SCHEMA = + ResolvedSchema.of( + Column.physical("f0", DataTypes.CHAR(50)), + Column.physical("f1", DataTypes.BINARY(40)), + Column.physical("f2", DataTypes.VARCHAR(30)), + Column.physical("f3", DataTypes.VARBINARY(20)), + Column.physical("f4", DataTypes.STRING())); @Test void testDataTypeCoverage() throws Exception { @@ -172,13 +181,6 @@ class DataGenTableSourceFactoryTest { descriptor.putString( DataGenConnectorOptionsUtil.FIELDS + ".f3." + DataGenConnectorOptionsUtil.MAX_PAST, "5s"); - - descriptor.putString( - DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND, - DataGenConnectorOptionsUtil.RANDOM); - descriptor.putLong( - DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH, - 2); descriptor.putString( DataGenConnectorOptionsUtil.FIELDS + ".f5." + DataGenConnectorOptionsUtil.KIND, DataGenConnectorOptionsUtil.SEQUENCE); @@ -237,12 +239,10 @@ class DataGenTableSourceFactoryTest { for (RowData row : results) { assertThat(row.getString(0).toString()) .hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT); - assertThat(row.getString(1).toString()) - .hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT); + assertThat(row.getString(1).toString()).hasSize(20); assertThat(row.getBinary(2)) .hasSize(RandomGeneratorVisitor.RANDOM_BYTES_LENGTH_DEFAULT); - assertThat(row.getBinary(3)) - .hasSize(RandomGeneratorVisitor.RANDOM_BYTES_LENGTH_DEFAULT); + assertThat(row.getBinary(3)).hasSize(4); } descriptor.putBoolean( @@ -283,22 +283,85 @@ class DataGenTableSourceFactoryTest { assertThat(sizeVarBinary.size()).isGreaterThan(1); assertThat(sizeVarChar.size()).isGreaterThan(1); - assertThatThrownBy( - () -> { - descriptor.putBoolean( - DataGenConnectorOptionsUtil.FIELDS - + ".f4." - + DataGenConnectorOptionsUtil.VAR_LEN, - true); + assertException( + schema, + descriptor, + "f4", + null, + true, + String.format( + "Only supports specifying '%s' option for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field '%s' is not within this range.", + DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4")); + } - runGenerator(schema, descriptor); - }) - .satisfies( - anyCauseMatches( - ValidationException.class, - String.format( - "Only supports specifying '%s' option for variable-length types (varchar, string, varbinary, bytes). The type of field %s is not within this range.", - DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4"))); + @Test + void testVariableLengthDataType() throws Exception { + DescriptorProperties descriptor = new DescriptorProperties(); + final int rowsNumber = 200; + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + + List<RowData> results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); + assertThat(results).hasSize(rowsNumber); + + for (RowData row : results) { + assertThat(row.getString(2).toString()).hasSize(30); + assertThat(row.getBinary(3)).hasSize(20); + assertThat(row.getString(4).toString()) + .hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT); + } + + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.RANDOM); + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f2." + DataGenConnectorOptionsUtil.LENGTH, + 25); + descriptor.putString( + DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.KIND, + DataGenConnectorOptionsUtil.RANDOM); + descriptor.putLong( + DataGenConnectorOptionsUtil.FIELDS + ".f4." + DataGenConnectorOptionsUtil.LENGTH, + 9999); + + results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); + + for (RowData row : results) { + assertThat(row.getString(2).toString()).hasSize(25); + assertThat(row.getString(4).toString()).hasSize(9999); + } + + assertException( + LENGTH_CONSTRAINED_SCHEMA, + descriptor, + "f3", + 21, + null, + "Custom length '21' for variable-length type (VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined in the schema."); + } + + @Test + void testFixedLengthDataType() throws Exception { + DescriptorProperties descriptor = new DescriptorProperties(); + final int rowsNumber = 200; + descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); + descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), rowsNumber); + + List<RowData> results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor); + assertThat(results).hasSize(rowsNumber); + + for (RowData row : results) { + assertThat(row.getString(0).toString()).hasSize(50); + assertThat(row.getBinary(1)).hasSize(40); + } + + assertException( + LENGTH_CONSTRAINED_SCHEMA, + descriptor, + "f0", + 20, + null, + "Custom length for fixed-length type (CHAR/BINARY) field 'f0' is not supported."); } private List<RowData> runGenerator(ResolvedSchema schema, DescriptorProperties descriptor) @@ -518,6 +581,46 @@ class DataGenTableSourceFactoryTest { anyCauseMatches("Could not parse value 'Wrong' for key 'fields.f0.start'")); } + private void assertException( + ResolvedSchema schema, + DescriptorProperties descriptor, + String fieldName, + @Nullable Integer len, + @Nullable Boolean varLen, + String expectedMessage) { + assertThatThrownBy( + () -> { + descriptor.putString( + String.join( + ".", + DataGenConnectorOptionsUtil.FIELDS, + fieldName, + DataGenConnectorOptionsUtil.KIND), + DataGenConnectorOptionsUtil.RANDOM); + if (len != null) { + descriptor.putLong( + String.join( + ".", + DataGenConnectorOptionsUtil.FIELDS, + fieldName, + DataGenConnectorOptionsUtil.LENGTH), + len); + } + if (varLen != null) { + descriptor.putBoolean( + String.join( + ".", + DataGenConnectorOptionsUtil.FIELDS, + fieldName, + DataGenConnectorOptionsUtil.VAR_LEN), + varLen); + } + + runGenerator(schema, descriptor); + }) + .satisfies(anyCauseMatches(ValidationException.class, expectedMessage)); + } + private static class TestContext implements SourceFunction.SourceContext<RowData> { private final Object lock = new Object();