LadyForest commented on code in PR #23678:
URL: https://github.com/apache/flink/pull/23678#discussion_r1410315513


##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##########
@@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer(
             String name, DataType type, String kind, ReadableConfig options) {
         switch (kind) {
             case DataGenConnectorOptionsUtil.RANDOM:
+                validateFieldOptions(name, type, options);
                 return type.getLogicalType().accept(new 
RandomGeneratorVisitor(name, options));
             case DataGenConnectorOptionsUtil.SEQUENCE:
                 return type.getLogicalType().accept(new 
SequenceGeneratorVisitor(name, options));
             default:
                 throw new ValidationException("Unsupported generator kind: " + 
kind);
         }
     }
+
+    private void validateFieldOptions(String name, DataType type, 
ReadableConfig options) {
+        ConfigOption<Integer> lenOption =
+                key(DataGenConnectorOptionsUtil.FIELDS
+                                + "."
+                                + name
+                                + "."
+                                + DataGenConnectorOptionsUtil.LENGTH)
+                        .intType()
+                        .noDefaultValue();
+        options.getOptional(lenOption)
+                .ifPresent(
+                        option -> {
+                            LogicalType logicalType = type.getLogicalType();
+                            if (logicalType instanceof CharType
+                                    || logicalType instanceof BinaryType) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "User-defined length of the 
fixed-length field %s is not supported.",

Review Comment:
   Nit: Custom length for fixed-length type field '%s' is not supported.



##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java:
##########
@@ -127,11 +132,53 @@ private DataGeneratorContainer createContainer(
             String name, DataType type, String kind, ReadableConfig options) {
         switch (kind) {
             case DataGenConnectorOptionsUtil.RANDOM:
+                validateFieldOptions(name, type, options);
                 return type.getLogicalType().accept(new 
RandomGeneratorVisitor(name, options));
             case DataGenConnectorOptionsUtil.SEQUENCE:
                 return type.getLogicalType().accept(new 
SequenceGeneratorVisitor(name, options));
             default:
                 throw new ValidationException("Unsupported generator kind: " + 
kind);
         }
     }
+
+    private void validateFieldOptions(String name, DataType type, 
ReadableConfig options) {
+        ConfigOption<Integer> lenOption =
+                key(DataGenConnectorOptionsUtil.FIELDS
+                                + "."
+                                + name
+                                + "."
+                                + DataGenConnectorOptionsUtil.LENGTH)
+                        .intType()
+                        .noDefaultValue();
+        options.getOptional(lenOption)
+                .ifPresent(
+                        option -> {
+                            LogicalType logicalType = type.getLogicalType();
+                            if (logicalType instanceof CharType
+                                    || logicalType instanceof BinaryType) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "User-defined length of the 
fixed-length field %s is not supported.",
+                                                name));
+                            }
+                            if (logicalType instanceof VarCharType) {
+                                int length = ((VarCharType) 
logicalType).getLength();
+                                if (option > length) {
+                                    throw new ValidationException(
+                                            String.format(
+                                                    "User-defined length of 
the VARCHAR field %s should be shorter than the schema definition.",
+                                                    name));
+                                }
+                            }
+                            if (logicalType instanceof VarBinaryType) {
+                                int length = ((VarBinaryType) 
logicalType).getLength();
+                                if (option > length) {
+                                    throw new ValidationException(
+                                            String.format(
+                                                    "User-defined length of 
the VARBINARY field %s should be shorter than the schema definition.",
+                                                    name));
+                                }
+                            }
+                        });
+    }

Review Comment:
   Nit: I think we don't need to differentiate the `VARCHAR` v.s. `VARBINARY` 
here.
   
   How about
   ```java
   String.format("Custom length '%d' for variable-length type VARCHAR/VARBINARY 
should be shorter than '%d' defined in the schema.", option, length);
   ```
   



##########
flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java:
##########
@@ -215,6 +216,100 @@ void testSource() throws Exception {
         }
     }
 
+    @Test
+    void testVariableLengthDataType() throws Exception {
+        DescriptorProperties descriptor = new DescriptorProperties();

Review Comment:
   Nit: `DescriptorProperties` is deprecated already. If you're willing, you 
can open a new subtask under FLINK-31596 to migrate the usage to 
`CatalogPropertiesUtil`.



##########
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java:
##########
@@ -152,25 +142,20 @@ public DataGeneratorContainer visit(VarCharType 
varCharType) {
                                 + "."
                                 + DataGenConnectorOptionsUtil.LENGTH)
                         .intType()
-                        .defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+                        .defaultValue(varCharType.getLength());

Review Comment:
   Nit: maybe we can extract a util method like 
   ```java
   private ConfigOption<Integer> getLengthOption(Supplier<Integer> 
defaultLengthSupplier) {
           return key(String.join(
                           ".",
                           DataGenConnectorOptionsUtil.FIELDS,
                           name,
                           DataGenConnectorOptionsUtil.LENGTH))
                   .intType()
                   .defaultValue(defaultLengthSupplier.get());
       }
   ```
   ```java
   @Override
       public DataGeneratorContainer visit(VarCharType varCharType) {
           ConfigOption<Integer> lenOption = 
getLengthOption(varCharType::getLength);
           int length =
                   config.get(lenOption) == VarCharType.MAX_LENGTH
                           ? RANDOM_STRING_LENGTH_DEFAULT
                           : config.get(lenOption);
   
           ConfigOption<Float> nr = 
nullRate.floatType().defaultValue(NULL_RATE_DEFAULT);
           return DataGeneratorContainer.of(
                   
getRandomStringGenerator(length).withNullRate(config.get(nr)), lenOption, nr);
       }
   
   @Override
       public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
           ConfigOption<Integer> lenOption = 
getLengthOption(varBinaryType::getLength);
           int length =
                   config.get(lenOption) == VarBinaryType.MAX_LENGTH
                           ? RANDOM_BYTES_LENGTH_DEFAULT
                           : config.get(lenOption);
           return DataGeneratorContainer.of(getRandomBytesGenerator(length), 
lenOption);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to