This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7cd6547a9027dfdc7ea97e496bb0e15213150529
Author: Yubin Li <lixin58...@163.com>
AuthorDate: Tue Nov 7 14:13:22 2023 +0800

    [FLINK-32993][table] DataGen connector handles length-constrained fields 
according to the schema definition by default
    
    This closes #23678
---
 docs/content.zh/docs/connectors/table/datagen.md   |  14 +-
 docs/content/docs/connectors/table/datagen.md      |  18 ++-
 .../datagen/table/DataGenTableSourceFactory.java   |  43 +++++-
 .../datagen/table/RandomGeneratorVisitor.java      |  91 ++++--------
 .../factories/DataGenTableSourceFactoryTest.java   | 155 +++++++++++++++++----
 5 files changed, 223 insertions(+), 98 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/datagen.md 
b/docs/content.zh/docs/connectors/table/datagen.md
index 210ccc088d2..642382a7768 100644
--- a/docs/content.zh/docs/connectors/table/datagen.md
+++ b/docs/content.zh/docs/connectors/table/datagen.md
@@ -39,9 +39,14 @@ DataGen 连接器是内置的,不需要额外的依赖项。
 -----
 
 默认情况下,DataGen 表将创建无限数量的行,每列都有一个随机值。
-对于 char、varchar、binary、varbinary、string、array、map 和 multiset 类型,可以指定长度。
 还可以指定总行数,从而生成有界表。
 
+DataGen 连接器可以生成符合其 schema 的数据,应该注意的是,它按如下方式处理长度受限的字段:
+
+* 对于固定长度的数据类型(char、binary),字段长度只能由 schema 定义,且不支持自定义;
+* 对于可变长度数据类型 (varchar、varbinary),字段默认长度由 schema 定义,且自定义长度不能大于 schema 定义;
+* 对于超长字段(string、bytes),字段默认长度为 100,但可以定义为小于 2^31 的长度。
+
 还支持序列生成器,您可以指定序列的起始和结束值。
 如果表中有任一列是序列类型,则该表将是有界的,并在第一个序列完成时结束。
 
@@ -294,7 +299,12 @@ CREATE TABLE Orders (
       <td>可选</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
-      <td>随机生成器生成字符的长度,适用于 
char、varchar、binary、varbinary、string、array、map、multiset。</td>
+      <td>
+          随机生成器生成字符的长度,适用于 varchar、varbinary、string、bytes、array、map、multiset。
+          请注意对于可变长字段(varchar、varbinary),默认长度由 schema 定义,且长度不可设置为大于它;
+          对于超长字段(string、bytes),默认长度是 100 且可设置为小于 2^31 的长度;
+          对于结构化字段(数组、Map、多重集),默认元素数量为 3 且可以自定义。
+      </td>
     </tr>
     <tr>
       <td><h5>fields.#.var-len</h5></td>
diff --git a/docs/content/docs/connectors/table/datagen.md 
b/docs/content/docs/connectors/table/datagen.md
index b4fc81a4280..70253786bff 100644
--- a/docs/content/docs/connectors/table/datagen.md
+++ b/docs/content/docs/connectors/table/datagen.md
@@ -39,9 +39,16 @@ Usage
 -----
 
 By default, a DataGen table will create an unbounded number of rows with a 
random value for each column.
-For types, char/varchar/binary/varbinary/string/array/map/multiset, the length 
can be specified.
 Additionally, a total number of rows can be specified, resulting in a bounded 
table.
 
+The DataGen connector can generate data that conforms to its defined schema, 
It should be noted that it handles length-constrained fields as follows:
+
+* For fixed-length data types (char/binary), the field length can only be 
defined by the schema, 
+and does not support customization.
+* For variable-length data types (varchar/varbinary), the field length is 
initially defined by the schema, 
+and the customized length cannot be greater than the schema definition.
+* For super-long fields (string/bytes), the default length is 100, but can be 
set to a length less than 2^31.
+
 There also exists a sequence generator, where users specify a sequence of 
start and end values.
 If any column in a table is a sequence type, the table will be bounded and end 
with the first sequence completes.
 
@@ -77,7 +84,7 @@ WITH (
 LIKE Orders (EXCLUDING ALL)
 ```
 
-Further more, for variable sized types, varchar/string/varbinary/bytes, you 
can specify whether to enable variable-length data generation.
+Furthermore, for variable sized types, varchar/string/varbinary/bytes, you can 
specify whether to enable variable-length data generation.
 
 ```sql
 CREATE TABLE Orders (
@@ -296,7 +303,12 @@ Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">100</td>
       <td>Integer</td>
-      <td>Size or length of the collection for generating 
char/varchar/binary/varbinary/string/array/map/multiset types.</td>
+      <td>
+          Size or length of the collection for generating 
varchar/varbinary/string/bytes/array/map/multiset types. 
+          Please notice that for variable-length fields (varchar/varbinary), 
the default length is defined by the schema and cannot be set to a length 
greater than it.
+          for super-long fields (string/bytes), the default length is 100 and 
can be set to a length less than 2^31.
+          for constructed fields (array/map/multiset), the default number of 
elements is 3 and can be customized.
+      </td>
     </tr>
     <tr>
       <td><h5>fields.#.var-len</h5></td>
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
index 0cb29d61990..72398c34b96 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/DataGenTableSourceFactory.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -141,7 +143,7 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
     }
 
     private void validateFieldOptions(String name, DataType type, 
ReadableConfig options) {
-        ConfigOption<Boolean> lenOption =
+        ConfigOption<Boolean> varLenOption =
                 key(DataGenConnectorOptionsUtil.FIELDS
                                 + "."
                                 + name
@@ -149,7 +151,7 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
                                 + DataGenConnectorOptionsUtil.VAR_LEN)
                         .booleanType()
                         .defaultValue(false);
-        options.getOptional(lenOption)
+        options.getOptional(varLenOption)
                 .filter(option -> option)
                 .ifPresent(
                         option -> {
@@ -158,10 +160,43 @@ public class DataGenTableSourceFactory implements 
DynamicTableSourceFactory {
                                     || logicalType instanceof VarBinaryType)) {
                                 throw new ValidationException(
                                         String.format(
-                                                "Only supports specifying '%s' 
option for variable-length types (varchar, string, varbinary, bytes). The type 
of field %s is not within this range.",
-                                                "fields.#." + 
DataGenConnectorOptionsUtil.VAR_LEN,
+                                                "Only supports specifying '%s' 
option for variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of 
field '%s' is not within this range.",
+                                                
DataGenConnectorOptions.FIELD_VAR_LEN.key(), name));
+                            }
+                        });
+
+        ConfigOption<Integer> lenOption =
+                key(DataGenConnectorOptionsUtil.FIELDS
+                                + "."
+                                + name
+                                + "."
+                                + DataGenConnectorOptionsUtil.LENGTH)
+                        .intType()
+                        .noDefaultValue();
+        options.getOptional(lenOption)
+                .ifPresent(
+                        option -> {
+                            LogicalType logicalType = type.getLogicalType();
+                            if (logicalType instanceof CharType
+                                    || logicalType instanceof BinaryType) {
+                                throw new ValidationException(
+                                        String.format(
+                                                "Custom length for 
fixed-length type (CHAR/BINARY) field '%s' is not supported.",
                                                 name));
                             }
+                            if (logicalType instanceof VarCharType
+                                    || logicalType instanceof VarBinaryType) {
+                                int length =
+                                        logicalType instanceof VarCharType
+                                                ? ((VarCharType) 
logicalType).getLength()
+                                                : ((VarBinaryType) 
logicalType).getLength();
+                                if (option > length) {
+                                    throw new ValidationException(
+                                            String.format(
+                                                    "Custom length '%d' for 
variable-length type (VARCHAR/STRING/VARBINARY/BYTES) field '%s' should be 
shorter than '%d' defined in the schema.",
+                                                    option, name, length));
+                                }
+                            }
                         });
     }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
index 896ad000bf4..254278471a7 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/RandomGeneratorVisitor.java
@@ -60,6 +60,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
@@ -138,36 +139,22 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
 
     @Override
     public DataGeneratorContainer visit(CharType charType) {
-        ConfigOption<Integer> lenOption =
-                key(DataGenConnectorOptionsUtil.FIELDS
-                                + "."
-                                + name
-                                + "."
-                                + DataGenConnectorOptionsUtil.LENGTH)
-                        .intType()
-                        .defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
         ConfigOption<Float> nr = 
nullRate.floatType().defaultValue(NULL_RATE_DEFAULT);
         return DataGeneratorContainer.of(
-                
getRandomStringGenerator(config.get(lenOption)).withNullRate(config.get(nr)),
-                lenOption,
-                nr);
+                
getRandomStringGenerator(charType.getLength()).withNullRate(config.get(nr)), 
nr);
     }
 
     @Override
     public DataGeneratorContainer visit(VarCharType varCharType) {
-        ConfigOption<Integer> lenOption =
-                key(DataGenConnectorOptionsUtil.FIELDS
-                                + "."
-                                + name
-                                + "."
-                                + DataGenConnectorOptionsUtil.LENGTH)
-                        .intType()
-                        .defaultValue(RANDOM_STRING_LENGTH_DEFAULT);
+        ConfigOption<Integer> lenOption = 
getLengthOption(varCharType::getLength);
+        int length =
+                config.get(lenOption) == VarCharType.MAX_LENGTH
+                        ? RANDOM_STRING_LENGTH_DEFAULT
+                        : config.get(lenOption);
         ConfigOption<Float> nr = 
nullRate.floatType().defaultValue(NULL_RATE_DEFAULT);
         ConfigOption<Boolean> varLenOption = 
varLen.booleanType().defaultValue(false);
-
         return DataGeneratorContainer.of(
-                getRandomStringGenerator(config.get(lenOption))
+                getRandomStringGenerator(length)
                         .withNullRate(config.get(nr))
                         .withVarLen(config.get(varLenOption)),
                 lenOption,
@@ -177,30 +164,19 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
 
     @Override
     public DataGeneratorContainer visit(BinaryType binaryType) {
-        ConfigOption<Integer> lenOption =
-                key(DataGenConnectorOptionsUtil.FIELDS
-                                + "."
-                                + name
-                                + "."
-                                + DataGenConnectorOptionsUtil.LENGTH)
-                        .intType()
-                        .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
-        return 
DataGeneratorContainer.of(getRandomBytesGenerator(config.get(lenOption)), 
lenOption);
+        return 
DataGeneratorContainer.of(getRandomBytesGenerator(binaryType.getLength()));
     }
 
     @Override
     public DataGeneratorContainer visit(VarBinaryType varBinaryType) {
-        ConfigOption<Integer> lenOption =
-                key(DataGenConnectorOptionsUtil.FIELDS
-                                + "."
-                                + name
-                                + "."
-                                + DataGenConnectorOptionsUtil.LENGTH)
-                        .intType()
-                        .defaultValue(RANDOM_BYTES_LENGTH_DEFAULT);
+        ConfigOption<Integer> lenOption = 
getLengthOption(varBinaryType::getLength);
         ConfigOption<Boolean> varLenOption = 
varLen.booleanType().defaultValue(false);
+        int length =
+                config.get(lenOption) == VarBinaryType.MAX_LENGTH
+                        ? RANDOM_BYTES_LENGTH_DEFAULT
+                        : config.get(lenOption);
         return DataGeneratorContainer.of(
-                
getRandomBytesGenerator(config.get(lenOption)).withVarLen(config.get(varLenOption)),
+                
getRandomBytesGenerator(length).withVarLen(config.get(varLenOption)),
                 lenOption,
                 varLenOption);
     }
@@ -366,14 +342,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
 
     @Override
     public DataGeneratorContainer visit(ArrayType arrayType) {
-        ConfigOption<Integer> lenOption =
-                key(DataGenConnectorOptionsUtil.FIELDS
-                                + "."
-                                + name
-                                + "."
-                                + DataGenConnectorOptionsUtil.LENGTH)
-                        .intType()
-                        .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+        ConfigOption<Integer> lenOption = getLengthOption(() -> 
RANDOM_COLLECTION_LENGTH_DEFAULT);
 
         String fieldName = name + "." + "element";
         DataGeneratorContainer container =
@@ -390,14 +359,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
 
     @Override
     public DataGeneratorContainer visit(MultisetType multisetType) {
-        ConfigOption<Integer> lenOption =
-                key(DataGenConnectorOptionsUtil.FIELDS
-                                + "."
-                                + name
-                                + "."
-                                + DataGenConnectorOptionsUtil.LENGTH)
-                        .intType()
-                        .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+        ConfigOption<Integer> lenOption = getLengthOption(() -> 
RANDOM_COLLECTION_LENGTH_DEFAULT);
 
         String fieldName = name + "." + "element";
         DataGeneratorContainer container =
@@ -420,14 +382,7 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
 
     @Override
     public DataGeneratorContainer visit(MapType mapType) {
-        ConfigOption<Integer> lenOption =
-                key(DataGenConnectorOptionsUtil.FIELDS
-                                + "."
-                                + name
-                                + "."
-                                + DataGenConnectorOptionsUtil.LENGTH)
-                        .intType()
-                        .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT);
+        ConfigOption<Integer> lenOption = getLengthOption(() -> 
RANDOM_COLLECTION_LENGTH_DEFAULT);
 
         String keyName = name + "." + "key";
         String valName = name + "." + "value";
@@ -489,6 +444,16 @@ public class RandomGeneratorVisitor extends 
DataGenVisitorBase {
         throw new ValidationException("Unsupported type: " + logicalType);
     }
 
+    private ConfigOption<Integer> getLengthOption(Supplier<Integer> 
defaultLengthSupplier) {
+        return key(String.join(
+                        ".",
+                        DataGenConnectorOptionsUtil.FIELDS,
+                        name,
+                        DataGenConnectorOptionsUtil.LENGTH))
+                .intType()
+                .defaultValue(defaultLengthSupplier.get());
+    }
+
     private static RandomGenerator<StringData> getRandomStringGenerator(int 
length) {
         return new RandomGenerator<StringData>() {
             @Override
diff --git 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
index e137749fcfd..882ec105f01 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java
@@ -41,6 +41,8 @@ import org.apache.flink.util.InstantiationUtil;
 
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -65,6 +67,13 @@ class DataGenTableSourceFactoryTest {
                     Column.physical("f5", DataTypes.VARBINARY(4)),
                     Column.physical("f6", DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING())),
                     Column.physical("f7", DataTypes.STRING()));
+    private static final ResolvedSchema LENGTH_CONSTRAINED_SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("f0", DataTypes.CHAR(50)),
+                    Column.physical("f1", DataTypes.BINARY(40)),
+                    Column.physical("f2", DataTypes.VARCHAR(30)),
+                    Column.physical("f3", DataTypes.VARBINARY(20)),
+                    Column.physical("f4", DataTypes.STRING()));
 
     @Test
     void testDataTypeCoverage() throws Exception {
@@ -172,13 +181,6 @@ class DataGenTableSourceFactoryTest {
         descriptor.putString(
                 DataGenConnectorOptionsUtil.FIELDS + ".f3." + 
DataGenConnectorOptionsUtil.MAX_PAST,
                 "5s");
-
-        descriptor.putString(
-                DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.KIND,
-                DataGenConnectorOptionsUtil.RANDOM);
-        descriptor.putLong(
-                DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.LENGTH,
-                2);
         descriptor.putString(
                 DataGenConnectorOptionsUtil.FIELDS + ".f5." + 
DataGenConnectorOptionsUtil.KIND,
                 DataGenConnectorOptionsUtil.SEQUENCE);
@@ -237,12 +239,10 @@ class DataGenTableSourceFactoryTest {
         for (RowData row : results) {
             assertThat(row.getString(0).toString())
                     
.hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT);
-            assertThat(row.getString(1).toString())
-                    
.hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT);
+            assertThat(row.getString(1).toString()).hasSize(20);
             assertThat(row.getBinary(2))
                     
.hasSize(RandomGeneratorVisitor.RANDOM_BYTES_LENGTH_DEFAULT);
-            assertThat(row.getBinary(3))
-                    
.hasSize(RandomGeneratorVisitor.RANDOM_BYTES_LENGTH_DEFAULT);
+            assertThat(row.getBinary(3)).hasSize(4);
         }
 
         descriptor.putBoolean(
@@ -283,22 +283,85 @@ class DataGenTableSourceFactoryTest {
         assertThat(sizeVarBinary.size()).isGreaterThan(1);
         assertThat(sizeVarChar.size()).isGreaterThan(1);
 
-        assertThatThrownBy(
-                        () -> {
-                            descriptor.putBoolean(
-                                    DataGenConnectorOptionsUtil.FIELDS
-                                            + ".f4."
-                                            + 
DataGenConnectorOptionsUtil.VAR_LEN,
-                                    true);
+        assertException(
+                schema,
+                descriptor,
+                "f4",
+                null,
+                true,
+                String.format(
+                        "Only supports specifying '%s' option for 
variable-length types (VARCHAR/STRING/VARBINARY/BYTES). The type of field '%s' 
is not within this range.",
+                        DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4"));
+    }
 
-                            runGenerator(schema, descriptor);
-                        })
-                .satisfies(
-                        anyCauseMatches(
-                                ValidationException.class,
-                                String.format(
-                                        "Only supports specifying '%s' option 
for variable-length types (varchar, string, varbinary, bytes). The type of 
field %s is not within this range.",
-                                        
DataGenConnectorOptions.FIELD_VAR_LEN.key(), "f4")));
+    @Test
+    void testVariableLengthDataType() throws Exception {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        final int rowsNumber = 200;
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+        List<RowData> results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+        assertThat(results).hasSize(rowsNumber);
+
+        for (RowData row : results) {
+            assertThat(row.getString(2).toString()).hasSize(30);
+            assertThat(row.getBinary(3)).hasSize(20);
+            assertThat(row.getString(4).toString())
+                    
.hasSize(RandomGeneratorVisitor.RANDOM_STRING_LENGTH_DEFAULT);
+        }
+
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.RANDOM);
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f2." + 
DataGenConnectorOptionsUtil.LENGTH,
+                25);
+        descriptor.putString(
+                DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.KIND,
+                DataGenConnectorOptionsUtil.RANDOM);
+        descriptor.putLong(
+                DataGenConnectorOptionsUtil.FIELDS + ".f4." + 
DataGenConnectorOptionsUtil.LENGTH,
+                9999);
+
+        results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, descriptor);
+
+        for (RowData row : results) {
+            assertThat(row.getString(2).toString()).hasSize(25);
+            assertThat(row.getString(4).toString()).hasSize(9999);
+        }
+
+        assertException(
+                LENGTH_CONSTRAINED_SCHEMA,
+                descriptor,
+                "f3",
+                21,
+                null,
+                "Custom length '21' for variable-length type 
(VARCHAR/STRING/VARBINARY/BYTES) field 'f3' should be shorter than '20' defined 
in the schema.");
+    }
+
+    @Test
+    void testFixedLengthDataType() throws Exception {
+        DescriptorProperties descriptor = new DescriptorProperties();
+        final int rowsNumber = 200;
+        descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
+        descriptor.putLong(DataGenConnectorOptions.NUMBER_OF_ROWS.key(), 
rowsNumber);
+
+        List<RowData> results = runGenerator(LENGTH_CONSTRAINED_SCHEMA, 
descriptor);
+        assertThat(results).hasSize(rowsNumber);
+
+        for (RowData row : results) {
+            assertThat(row.getString(0).toString()).hasSize(50);
+            assertThat(row.getBinary(1)).hasSize(40);
+        }
+
+        assertException(
+                LENGTH_CONSTRAINED_SCHEMA,
+                descriptor,
+                "f0",
+                20,
+                null,
+                "Custom length for fixed-length type (CHAR/BINARY) field 'f0' 
is not supported.");
     }
 
     private List<RowData> runGenerator(ResolvedSchema schema, 
DescriptorProperties descriptor)
@@ -518,6 +581,46 @@ class DataGenTableSourceFactoryTest {
                         anyCauseMatches("Could not parse value 'Wrong' for key 
'fields.f0.start'"));
     }
 
+    private void assertException(
+            ResolvedSchema schema,
+            DescriptorProperties descriptor,
+            String fieldName,
+            @Nullable Integer len,
+            @Nullable Boolean varLen,
+            String expectedMessage) {
+        assertThatThrownBy(
+                        () -> {
+                            descriptor.putString(
+                                    String.join(
+                                            ".",
+                                            DataGenConnectorOptionsUtil.FIELDS,
+                                            fieldName,
+                                            DataGenConnectorOptionsUtil.KIND),
+                                    DataGenConnectorOptionsUtil.RANDOM);
+                            if (len != null) {
+                                descriptor.putLong(
+                                        String.join(
+                                                ".",
+                                                
DataGenConnectorOptionsUtil.FIELDS,
+                                                fieldName,
+                                                
DataGenConnectorOptionsUtil.LENGTH),
+                                        len);
+                            }
+                            if (varLen != null) {
+                                descriptor.putBoolean(
+                                        String.join(
+                                                ".",
+                                                
DataGenConnectorOptionsUtil.FIELDS,
+                                                fieldName,
+                                                
DataGenConnectorOptionsUtil.VAR_LEN),
+                                        varLen);
+                            }
+
+                            runGenerator(schema, descriptor);
+                        })
+                .satisfies(anyCauseMatches(ValidationException.class, 
expectedMessage));
+    }
+
     private static class TestContext implements 
SourceFunction.SourceContext<RowData> {
 
         private final Object lock = new Object();

Reply via email to