LadyForest commented on code in PR #23678: URL: https://github.com/apache/flink/pull/23678#discussion_r1410315513
########## flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java: ########## @@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer( String name, DataType type, String kind, ReadableConfig options) { switch (kind) { case DataGenConnectorOptionsUtil.RANDOM: + validateFieldOptions(name, type, options); return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options)); case DataGenConnectorOptionsUtil.SEQUENCE: return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options)); default: throw new ValidationException("Unsupported generator kind: " + kind); } } + + private void validateFieldOptions(String name, DataType type, ReadableConfig options) { + ConfigOption<Integer> lenOption = + key(DataGenConnectorOptionsUtil.FIELDS + + "." + + name + + "." + + DataGenConnectorOptionsUtil.LENGTH) + .intType() + .noDefaultValue(); + options.getOptional(lenOption) + .ifPresent( + option -> { + LogicalType logicalType = type.getLogicalType(); + if (logicalType instanceof CharType + || logicalType instanceof BinaryType) { + throw new ValidationException( + String.format( + "User-defined length of the fixed-length field %s is not supported.", Review Comment: Nit: Custom length for fixed-length type field '%s' is not supported. ########## flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java: ########## @@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer( String name, DataType type, String kind, ReadableConfig options) { switch (kind) { case DataGenConnectorOptionsUtil.RANDOM: + validateFieldOptions(name, type, options); return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options)); case DataGenConnectorOptionsUtil.SEQUENCE: return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options)); default: throw new ValidationException("Unsupported generator kind: " + kind); } } + + private void validateFieldOptions(String name, DataType type, ReadableConfig options) { + ConfigOption<Integer> lenOption = + key(DataGenConnectorOptionsUtil.FIELDS + + "." + + name + + "." + + DataGenConnectorOptionsUtil.LENGTH) + .intType() + .noDefaultValue(); + options.getOptional(lenOption) + .ifPresent( + option -> { + LogicalType logicalType = type.getLogicalType(); + if (logicalType instanceof CharType + || logicalType instanceof BinaryType) { + throw new ValidationException( + String.format( + "User-defined length of the fixed-length field %s is not supported.", + name)); + } + if (logicalType instanceof VarCharType) { + int length = ((VarCharType) logicalType).getLength(); + if (option > length) { + throw new ValidationException( + String.format( + "User-defined length of the VARCHAR field %s should be shorter than the schema definition.", + name)); + } + } + if (logicalType instanceof VarBinaryType) { + int length = ((VarBinaryType) logicalType).getLength(); + if (option > length) { + throw new ValidationException( + String.format( + "User-defined length of the VARBINARY field %s should be shorter than the schema definition.", + name)); + } + } + }); + } Review Comment: Nit: I think we don't need to differentiate the `VARCHAR` v.s. `VARBINARY` here. How about ```java String.format("Custom length '%d' for variable-length type VARCHAR/VARBINARY should be shorter than '%d' defined in the schema.", option, length); ``` ########## flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java: ########## @@ -215,6 +216,100 @@ void testSource() throws Exception { } } + @Test + void testVariableLengthDataType() throws Exception { + DescriptorProperties descriptor = new DescriptorProperties(); Review Comment: Nit: `DescriptorProperties` is deprecated already. If you're willing, you can open a new subtask under FLINK-31596 to migrate the usage to `CatalogPropertiesUtil`. ########## flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java: ########## @@ -152,25 +142,20 @@ public DataGeneratorContainer visit(VarCharType varCharType) { + "." + DataGenConnectorOptionsUtil.LENGTH) .intType() - .defaultValue(RANDOM_STRING_LENGTH_DEFAULT); + .defaultValue(varCharType.getLength()); Review Comment: Nit: maybe we can extract a util method like ```java private ConfigOption<Integer> getLengthOption(Supplier<Integer> defaultLengthSupplier) { return key(String.join( ".", DataGenConnectorOptionsUtil.FIELDS, name, DataGenConnectorOptionsUtil.LENGTH)) .intType() .defaultValue(defaultLengthSupplier.get()); } ``` ```java @Override public DataGeneratorContainer visit(VarCharType varCharType) { ConfigOption<Integer> lenOption = getLengthOption(varCharType::getLength); int length = config.get(lenOption) == VarCharType.MAX_LENGTH ? RANDOM_STRING_LENGTH_DEFAULT : config.get(lenOption); ConfigOption<Float> nr = nullRate.floatType().defaultValue(NULL_RATE_DEFAULT); return DataGeneratorContainer.of( getRandomStringGenerator(length).withNullRate(config.get(nr)), lenOption, nr); } @Override public DataGeneratorContainer visit(VarBinaryType varBinaryType) { ConfigOption<Integer> lenOption = getLengthOption(varBinaryType::getLength); int length = config.get(lenOption) == VarBinaryType.MAX_LENGTH ? RANDOM_BYTES_LENGTH_DEFAULT : config.get(lenOption); return DataGeneratorContainer.of(getRandomBytesGenerator(length), lenOption); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org