This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit abd3da9e0a28aaa526a569133feadfe0c9af68cc Author: Ingo Bürk <ingo.bu...@tngtech.com> AuthorDate: Tue May 11 17:20:57 2021 +0200 [FLINK-22475][table-api-java-bridge] Add placeholder options for datagen connector This closes #15896. --- .../flink/table/factories/DataGenOptions.java | 103 +++++++++++++++++++++ .../table/factories/DataGenTableSourceFactory.java | 54 ++++------- .../factories/datagen/RandomGeneratorVisitor.java | 19 ++-- .../datagen/SequenceGeneratorVisitor.java | 8 +- .../factories/DataGenTableSourceFactoryTest.java | 68 +++++++------- 5 files changed, 167 insertions(+), 85 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java new file mode 100644 index 0000000..e445359 --- /dev/null +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenOptions.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** {@link ConfigOption}s for {@link DataGenTableSourceFactory}. */ +@Internal +public class DataGenOptions { + + public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L; + + public static final String FIELDS = "fields"; + public static final String KIND = "kind"; + public static final String START = "start"; + public static final String END = "end"; + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String LENGTH = "length"; + + public static final String SEQUENCE = "sequence"; + public static final String RANDOM = "random"; + + public static final ConfigOption<Long> ROWS_PER_SECOND = + key("rows-per-second") + .longType() + .defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE) + .withDescription("Rows per second to control the emit rate."); + + public static final ConfigOption<Long> NUMBER_OF_ROWS = + key("number-of-rows") + .longType() + .noDefaultValue() + .withDescription( + "Total number of rows to emit. By default, the source is unbounded."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption<String> FIELD_KIND = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, KIND)) + .stringType() + .defaultValue("random") + .withDescription("Generator of this '#' field. Can be 'sequence' or 'random'."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption<String> FIELD_MIN = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, MIN)) + .stringType() + .noDefaultValue() + .withDescription( + "Minimum value to generate for fields of kind 'random'. Minimum value possible for the type of the field."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption<String> FIELD_MAX = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAX)) + .stringType() + .noDefaultValue() + .withDescription( + "Maximum value to generate for fields of kind 'random'. Maximum value possible for the type of the field."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption<Integer> FIELD_LENGTH = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, LENGTH)) + .intType() + .defaultValue(100) + .withDescription( + "Size or length of the collection for generating char/varchar/string/array/map/multiset types."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption<String> FIELD_START = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, START)) + .stringType() + .noDefaultValue() + .withDescription("Start value of sequence generator."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption<String> FIELD_END = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, END)) + .stringType() + .noDefaultValue() + .withDescription("End value of sequence generator."); + + private DataGenOptions() {} +} diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java index 4cce51d..622f511 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/DataGenTableSourceFactory.java @@ -46,31 +46,6 @@ import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; public class DataGenTableSourceFactory implements DynamicTableSourceFactory { public static final String IDENTIFIER = "datagen"; - public static final Long ROWS_PER_SECOND_DEFAULT_VALUE = 10000L; - - public static final ConfigOption<Long> ROWS_PER_SECOND = - key("rows-per-second") - .longType() - .defaultValue(ROWS_PER_SECOND_DEFAULT_VALUE) - .withDescription("Rows per second to control the emit rate."); - - public static final ConfigOption<Long> NUMBER_OF_ROWS = - key("number-of-rows") - .longType() - .noDefaultValue() - .withDescription( - "Total number of rows to emit. By default, the source is unbounded."); - - public static final String FIELDS = "fields"; - public static final String KIND = "kind"; - public static final String START = "start"; - public static final String END = "end"; - public static final String MIN = "min"; - public static final String MAX = "max"; - public static final String LENGTH = "length"; - - public static final String SEQUENCE = "sequence"; - public static final String RANDOM = "random"; @Override public String factoryIdentifier() { @@ -85,8 +60,17 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory { @Override public Set<ConfigOption<?>> optionalOptions() { Set<ConfigOption<?>> options = new HashSet<>(); - options.add(ROWS_PER_SECOND); - options.add(NUMBER_OF_ROWS); + options.add(DataGenOptions.ROWS_PER_SECOND); + options.add(DataGenOptions.NUMBER_OF_ROWS); + + // Placeholder options + options.add(DataGenOptions.FIELD_KIND); + options.add(DataGenOptions.FIELD_MIN); + options.add(DataGenOptions.FIELD_MAX); + options.add(DataGenOptions.FIELD_LENGTH); + options.add(DataGenOptions.FIELD_START); + options.add(DataGenOptions.FIELD_END); + return options; } @@ -105,7 +89,9 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory { DataType type = schema.getFieldDataTypes()[i]; ConfigOption<String> kind = - key(FIELDS + "." + name + "." + KIND).stringType().defaultValue(RANDOM); + key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.KIND) + .stringType() + .defaultValue(DataGenOptions.RANDOM); DataGeneratorContainer container = createContainer(name, type, options.get(kind), options); fieldGenerators[i] = container.getGenerator(); @@ -118,8 +104,8 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory { Set<String> consumedOptionKeys = new HashSet<>(); consumedOptionKeys.add(CONNECTOR.key()); - consumedOptionKeys.add(ROWS_PER_SECOND.key()); - consumedOptionKeys.add(NUMBER_OF_ROWS.key()); + consumedOptionKeys.add(DataGenOptions.ROWS_PER_SECOND.key()); + consumedOptionKeys.add(DataGenOptions.NUMBER_OF_ROWS.key()); optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add); FactoryUtil.validateUnconsumedKeys( factoryIdentifier(), options.keySet(), consumedOptionKeys); @@ -129,16 +115,16 @@ public class DataGenTableSourceFactory implements DynamicTableSourceFactory { fieldGenerators, name, schema, - options.get(ROWS_PER_SECOND), - options.get(NUMBER_OF_ROWS)); + options.get(DataGenOptions.ROWS_PER_SECOND), + options.get(DataGenOptions.NUMBER_OF_ROWS)); } private DataGeneratorContainer createContainer( String name, DataType type, String kind, ReadableConfig options) { switch (kind) { - case RANDOM: + case DataGenOptions.RANDOM: return type.getLogicalType().accept(new RandomGeneratorVisitor(name, options)); - case SEQUENCE: + case DataGenOptions.SEQUENCE: return type.getLogicalType().accept(new SequenceGeneratorVisitor(name, options)); default: throw new ValidationException("Unsupported generator kind: " + kind); diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java index f18a851..13c93af 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/RandomGeneratorVisitor.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.data.GenericArrayData; import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.DataGenOptions; import org.apache.flink.table.factories.datagen.types.DataGeneratorMapper; import org.apache.flink.table.factories.datagen.types.DecimalDataRandomGenerator; import org.apache.flink.table.factories.datagen.types.RowDataGenerator; @@ -55,10 +56,6 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.configuration.ConfigOptions.key; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN; /** Creates a random {@link DataGeneratorContainer} for a particular logical type. */ @Internal @@ -76,8 +73,8 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { public RandomGeneratorVisitor(String name, ReadableConfig config) { super(name, config); - this.minKey = key(FIELDS + "." + name + "." + MIN); - this.maxKey = key(FIELDS + "." + name + "." + MAX); + this.minKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MIN); + this.maxKey = key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.MAX); } @Override @@ -88,7 +85,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(CharType booleanType) { ConfigOption<Integer> lenOption = - key(FIELDS + "." + name + "." + LENGTH) + key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH) .intType() .defaultValue(RANDOM_STRING_LENGTH_DEFAULT); return DataGeneratorContainer.of( @@ -98,7 +95,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(VarCharType booleanType) { ConfigOption<Integer> lenOption = - key(FIELDS + "." + name + "." + LENGTH) + key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH) .intType() .defaultValue(RANDOM_STRING_LENGTH_DEFAULT); return DataGeneratorContainer.of( @@ -190,7 +187,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(ArrayType arrayType) { ConfigOption<Integer> lenOption = - key(FIELDS + "." + name + "." + LENGTH) + key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH) .intType() .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT); @@ -208,7 +205,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(MultisetType multisetType) { ConfigOption<Integer> lenOption = - key(FIELDS + "." + name + "." + LENGTH) + key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH) .intType() .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT); @@ -230,7 +227,7 @@ public class RandomGeneratorVisitor extends DataGenVisitorBase { @Override public DataGeneratorContainer visit(MapType mapType) { ConfigOption<Integer> lenOption = - key(FIELDS + "." + name + "." + LENGTH) + key(DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.LENGTH) .intType() .defaultValue(RANDOM_COLLECTION_LENGTH_DEFAULT); diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java index 214697b..8ce3e51 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/factories/datagen/SequenceGeneratorVisitor.java @@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator; import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.factories.DataGenOptions; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BooleanType; import org.apache.flink.table.types.logical.CharType; @@ -38,9 +39,6 @@ import org.apache.flink.table.types.logical.TinyIntType; import org.apache.flink.table.types.logical.VarCharType; import static org.apache.flink.configuration.ConfigOptions.key; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.END; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.START; /** Creates a sequential {@link DataGeneratorContainer} for a particular logical type. */ @Internal @@ -65,8 +63,8 @@ public class SequenceGeneratorVisitor extends DataGenVisitorBase { this.config = config; - this.startKeyStr = FIELDS + "." + name + "." + START; - this.endKeyStr = FIELDS + "." + name + "." + END; + this.startKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.START; + this.endKeyStr = DataGenOptions.FIELDS + "." + name + "." + DataGenOptions.END; ConfigOptions.OptionBuilder startKey = key(startKeyStr); ConfigOptions.OptionBuilder endKey = key(endKeyStr); diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java index fc07257..5943a9e 100644 --- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java +++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/factories/DataGenTableSourceFactoryTest.java @@ -44,17 +44,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.END; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.FIELDS; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.KIND; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.LENGTH; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.MAX; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.MIN; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.NUMBER_OF_ROWS; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.RANDOM; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.ROWS_PER_SECOND; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.SEQUENCE; -import static org.apache.flink.table.factories.DataGenTableSourceFactory.START; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.junit.Assert.assertTrue; @@ -104,7 +93,7 @@ public class DataGenTableSourceFactoryTest { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putString(NUMBER_OF_ROWS.key(), "10"); + descriptor.putString(DataGenOptions.NUMBER_OF_ROWS.key(), "10"); // add min max option for numeric types descriptor.putString("fields.f4.min", "1.0"); @@ -138,18 +127,21 @@ public class DataGenTableSourceFactoryTest { public void testSource() throws Exception { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putLong(ROWS_PER_SECOND.key(), 100); + descriptor.putLong(DataGenOptions.ROWS_PER_SECOND.key(), 100); - descriptor.putString(FIELDS + ".f0." + KIND, RANDOM); - descriptor.putLong(FIELDS + ".f0." + LENGTH, 20); + descriptor.putString( + DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM); + descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.LENGTH, 20); - descriptor.putString(FIELDS + ".f1." + KIND, RANDOM); - descriptor.putLong(FIELDS + ".f1." + MIN, 10); - descriptor.putLong(FIELDS + ".f1." + MAX, 100); + descriptor.putString( + DataGenOptions.FIELDS + ".f1." + DataGenOptions.KIND, DataGenOptions.RANDOM); + descriptor.putLong(DataGenOptions.FIELDS + ".f1." + DataGenOptions.MIN, 10); + descriptor.putLong(DataGenOptions.FIELDS + ".f1." + DataGenOptions.MAX, 100); - descriptor.putString(FIELDS + ".f2." + KIND, SEQUENCE); - descriptor.putLong(FIELDS + ".f2." + START, 50); - descriptor.putLong(FIELDS + ".f2." + END, 60); + descriptor.putString( + DataGenOptions.FIELDS + ".f2." + DataGenOptions.KIND, DataGenOptions.SEQUENCE); + descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.START, 50); + descriptor.putLong(DataGenOptions.FIELDS + ".f2." + DataGenOptions.END, 60); List<RowData> results = runGenerator(SCHEMA, descriptor); @@ -191,9 +183,10 @@ public class DataGenTableSourceFactoryTest { public void testSequenceCheckpointRestore() throws Exception { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE); - descriptor.putLong(FIELDS + ".f0." + START, 0); - descriptor.putLong(FIELDS + ".f0." + END, 100); + descriptor.putString( + DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE); + descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0); + descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, 100); DynamicTableSource dynamicTableSource = createTableSource( @@ -225,8 +218,9 @@ public class DataGenTableSourceFactoryTest { try { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE); - descriptor.putLong(FIELDS + ".f0." + END, 100); + descriptor.putString( + DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE); + descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, 100); createTableSource( ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), @@ -249,8 +243,9 @@ public class DataGenTableSourceFactoryTest { try { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE); - descriptor.putLong(FIELDS + ".f0." + START, 0); + descriptor.putString( + DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE); + descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0); createTableSource( ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), @@ -294,8 +289,9 @@ public class DataGenTableSourceFactoryTest { try { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putString(FIELDS + ".f0." + KIND, RANDOM); - descriptor.putLong(FIELDS + ".f0." + START, 0); + descriptor.putString( + DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM); + descriptor.putLong(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, 0); createTableSource( ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), @@ -316,8 +312,9 @@ public class DataGenTableSourceFactoryTest { try { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putString(FIELDS + ".f0." + KIND, RANDOM); - descriptor.putInt(FIELDS + ".f0." + LENGTH, 100); + descriptor.putString( + DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.RANDOM); + descriptor.putInt(DataGenOptions.FIELDS + ".f0." + DataGenOptions.LENGTH, 100); createTableSource( ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())), @@ -338,9 +335,10 @@ public class DataGenTableSourceFactoryTest { try { DescriptorProperties descriptor = new DescriptorProperties(); descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen"); - descriptor.putString(FIELDS + ".f0." + KIND, SEQUENCE); - descriptor.putString(FIELDS + ".f0." + START, "Wrong"); - descriptor.putString(FIELDS + ".f0." + END, "Wrong"); + descriptor.putString( + DataGenOptions.FIELDS + ".f0." + DataGenOptions.KIND, DataGenOptions.SEQUENCE); + descriptor.putString(DataGenOptions.FIELDS + ".f0." + DataGenOptions.START, "Wrong"); + descriptor.putString(DataGenOptions.FIELDS + ".f0." + DataGenOptions.END, "Wrong"); createTableSource( ResolvedSchema.of(Column.physical("f0", DataTypes.BIGINT())),