This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b6ca017e095de4b055e58af195e1f0a00e312d6e Author: Marios Trivyzas <mat...@gmail.com> AuthorDate: Thu Dec 9 13:50:10 2021 +0100 [hotfix][table] Make use of VarCharType.STRING_TYPE Replace occurrences of `new VarCharType(MAX_LENGTH()` with new constant `VarCharType.STRING_TYPE`. --- .../flink/table/types/logical/utils/LogicalTypeParser.java | 2 +- .../java/org/apache/flink/table/types/DataTypesTest.java | 2 +- .../apache/flink/table/types/LogicalCommonTypeTest.java | 4 ++-- .../apache/flink/table/types/LogicalTypeParserTest.java | 2 +- .../table/types/extraction/DataTypeExtractorTest.java | 9 +++------ .../flink/table/planner/plan/type/FlinkReturnTypes.java | 4 ++-- .../flink/table/planner/codegen/calls/StringCallGen.scala | 2 +- .../flink/table/planner/codegen/SortCodeGeneratorTest.java | 2 +- .../org/apache/flink/table/api/batch/ExplainTest.scala | 2 +- .../org/apache/flink/table/api/stream/ExplainTest.scala | 2 +- .../flink/table/planner/calcite/FlinkTypeFactoryTest.scala | 6 +++--- .../flink/table/planner/codegen/agg/AggTestBase.scala | 4 ++-- .../table/planner/codegen/agg/batch/BatchAggTestBase.scala | 2 +- .../codegen/agg/batch/HashAggCodeGeneratorTest.scala | 2 +- .../codegen/agg/batch/SortAggCodeGeneratorTest.scala | 4 ++-- .../planner/expressions/utils/ExpressionTestBase.scala | 2 +- .../table/planner/plan/batch/sql/DagOptimizationTest.scala | 2 +- .../table/planner/plan/metadata/MetadataTestUtil.scala | 6 +++--- .../planner/plan/stream/sql/DagOptimizationTest.scala | 2 +- .../table/planner/plan/stream/sql/LegacySinkTest.scala | 2 +- .../plan/stream/sql/MiniBatchIntervalInferTest.scala | 2 +- .../runtime/batch/sql/PartitionableSinkITCase.scala | 2 +- .../table/planner/runtime/batch/sql/UnionITCase.scala | 2 +- .../table/planner/runtime/stream/sql/CalcITCase.scala | 4 ++-- .../org/apache/flink/table/data/BinaryArrayDataTest.java | 3 +-- .../org/apache/flink/table/data/BinaryRowDataTest.java | 3 +-- .../apache/flink/table/data/DataFormatConvertersTest.java | 4 ++-- .../aggregate/window/SlicingWindowAggOperatorTest.java | 3 +-- .../deduplicate/ProcTimeDeduplicateFunctionTestBase.java | 3 +-- .../deduplicate/RowTimeDeduplicateFunctionTestBase.java | 3 +-- .../window/RowTimeWindowDeduplicateOperatorTest.java | 3 +-- .../operators/join/RandomSortMergeInnerJoinTest.java | 6 +++--- .../operators/join/String2HashJoinOperatorTest.java | 14 ++++++-------- .../operators/join/String2SortMergeJoinOperatorTest.java | 12 +++++------- .../join/interval/TimeIntervalStreamJoinTestBase.java | 6 +++--- .../join/temporal/TemporalProcessTimeJoinOperatorTest.java | 6 +++--- .../join/temporal/TemporalTimeJoinOperatorTestBase.java | 12 +++++------- .../operators/join/window/WindowJoinOperatorTest.java | 6 +++--- .../over/ProcTimeRangeBoundedPrecedingFunctionTest.java | 2 +- .../runtime/operators/over/RowTimeOverWindowTestBase.java | 4 +--- .../table/runtime/operators/rank/TopNFunctionTestBase.java | 8 ++------ .../operators/rank/window/WindowRankOperatorTest.java | 5 ++--- .../runtime/operators/sort/ProcTimeSortOperatorTest.java | 5 +---- .../runtime/operators/sort/RowTimeSortOperatorTest.java | 10 ++-------- .../runtime/operators/sort/StreamSortOperatorTest.java | 2 +- .../operators/window/WindowOperatorContractTest.java | 3 +-- .../table/runtime/operators/window/WindowOperatorTest.java | 11 ++++------- .../table/runtime/types/DataTypePrecisionFixerTest.java | 2 +- .../table/runtime/typeutils/RowDataSerializerTest.java | 6 +++--- .../util/collections/binary/BytesHashMapTestBase.java | 2 +- .../util/collections/binary/BytesMultiMapTestBase.java | 4 ++-- 51 files changed, 93 insertions(+), 128 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java index 1c69d77..15b5daa 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeParser.java @@ -528,7 +528,7 @@ public final class LogicalTypeParser { case VARCHAR: return parseVarCharType(); case STRING: - return new VarCharType(VarCharType.MAX_LENGTH); + return VarCharType.STRING_TYPE; case BOOLEAN: return new BooleanType(); case BINARY: diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java index 9d5288e..a658285 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypesTest.java @@ -118,7 +118,7 @@ public class DataTypesTest { .expectLogicalType(new VarCharType(2)) .expectConversionClass(String.class), TestSpec.forDataType(STRING()) - .expectLogicalType(new VarCharType(VarCharType.MAX_LENGTH)) + .expectLogicalType(VarCharType.STRING_TYPE) .expectConversionClass(String.class), TestSpec.forDataType(BOOLEAN()) .expectLogicalType(new BooleanType()) diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java index 29745ec..ad76ea6 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalCommonTypeTest.java @@ -141,8 +141,8 @@ public class LogicalCommonTypeTest { // VARCHAR types of different length { - Arrays.asList(new VarCharType(2), new VarCharType(VarCharType.MAX_LENGTH)), - new VarCharType(VarCharType.MAX_LENGTH) + Arrays.asList(new VarCharType(2), VarCharType.STRING_TYPE), + VarCharType.STRING_TYPE }, // mixed VARCHAR and CHAR types diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java index 1a58475..88758f2 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeParserTest.java @@ -91,7 +91,7 @@ public class LogicalTypeParserTest { TestSpec.forString("CHAR(33)").expectType(new CharType(33)), TestSpec.forString("VARCHAR").expectType(new VarCharType()), TestSpec.forString("VARCHAR(33)").expectType(new VarCharType(33)), - TestSpec.forString("STRING").expectType(new VarCharType(VarCharType.MAX_LENGTH)), + TestSpec.forString("STRING").expectType(VarCharType.STRING_TYPE), TestSpec.forString("BOOLEAN").expectType(new BooleanType()), TestSpec.forString("BINARY").expectType(new BinaryType()), TestSpec.forString("BINARY(33)").expectType(new BinaryType(33)), diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java index f023a50..dd58c82 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java @@ -619,8 +619,7 @@ public class DataTypeExtractorTest { new StructuredAttribute("intField", new IntType(true)), new StructuredAttribute("primitiveBooleanField", new BooleanType(false)), new StructuredAttribute("primitiveIntField", new IntType(false)), - new StructuredAttribute( - "stringField", new VarCharType(VarCharType.MAX_LENGTH)))); + new StructuredAttribute("stringField", VarCharType.STRING_TYPE))); builder.setFinal(true); builder.setInstantiable(true); final StructuredType structuredType = builder.build(); @@ -641,9 +640,7 @@ public class DataTypeExtractorTest { builder.attributes( Arrays.asList( new StructuredAttribute( - "mapField", - new MapType( - new VarCharType(VarCharType.MAX_LENGTH), new IntType())), + "mapField", new MapType(VarCharType.STRING_TYPE, new IntType())), new StructuredAttribute( "simplePojoField", getSimplePojoDataType(simplePojoClass).getLogicalType()), @@ -700,7 +697,7 @@ public class DataTypeExtractorTest { final StructuredType.Builder builder = StructuredType.newBuilder(Tuple2.class); builder.attributes( Arrays.asList( - new StructuredAttribute("f0", new VarCharType(VarCharType.MAX_LENGTH)), + new StructuredAttribute("f0", VarCharType.STRING_TYPE), new StructuredAttribute("f1", new BooleanType()))); builder.setFinal(true); builder.setInstantiable(true); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java index b495a98..1ff51b4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/type/FlinkReturnTypes.java @@ -127,6 +127,6 @@ public class FlinkReturnTypes { ((FlinkTypeFactory) factory) .createFieldTypeFromLogicalType( new MapType( - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH)))); + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE))); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala index b605ddc..7619486 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala @@ -765,7 +765,7 @@ object StringCallGen { operands: Seq[GeneratedExpression]): GeneratedExpression = { val className = classOf[SqlFunctionUtils].getCanonicalName val t = new MapType( - new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH)) + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE) val converter = DataFormatConverters.getConverterForDataType( DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())) val converterTerm = ctx.addReusableObject(converter, "mapConverter") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java index 8a26c3e..931b0ec 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java @@ -105,7 +105,7 @@ public class SortCodeGeneratorTest { new BigIntType(), new FloatType(), new DoubleType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new DecimalType(18, 2), new DecimalType(38, 18), new VarBinaryType(VarBinaryType.MAX_LENGTH), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala index cd20033..8314f67 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/ExplainTest.scala @@ -43,7 +43,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c) util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f) - val STRING = new VarCharType(VarCharType.MAX_LENGTH) + val STRING = VarCharType.STRING_TYPE val LONG = new BigIntType() val INT = new IntType() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala index cadbaa5..a66db23 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala @@ -46,7 +46,7 @@ class ExplainTest(extended: Boolean) extends TableTestBase { util.addDataStream[(Int, Long, String)]("MyTable1", 'a, 'b, 'c) util.addDataStream[(Int, Long, String)]("MyTable2", 'd, 'e, 'f) - val STRING = new VarCharType(VarCharType.MAX_LENGTH) + val STRING = VarCharType.STRING_TYPE val LONG = new BigIntType() val INT = new IntType() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala index d8be0ea..d6c8c7e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactoryTest.scala @@ -69,7 +69,7 @@ class FlinkTypeFactoryTest { test(new NullType()) test(new BooleanType()) test(new TinyIntType()) - test(new VarCharType(VarCharType.MAX_LENGTH)) + test(VarCharType.STRING_TYPE) test(new DoubleType()) test(new FloatType()) test(new IntType()) @@ -82,8 +82,8 @@ class FlinkTypeFactoryTest { test(new LocalZonedTimestampType(3)) test(new ArrayType(new DoubleType())) - test(new MapType(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH))) - test(RowType.of(new DoubleType(), new VarCharType(VarCharType.MAX_LENGTH))) + test(new MapType(new DoubleType(), VarCharType.STRING_TYPE)) + test(RowType.of(new DoubleType(), VarCharType.STRING_TYPE)) test(new RawType[DayOfWeek]( classOf[DayOfWeek], new KryoSerializer[DayOfWeek](classOf[DayOfWeek], new ExecutionConfig))) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala index cd4d0d5..3fac689 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/AggTestBase.scala @@ -57,8 +57,8 @@ abstract class AggTestBase(isBatchMode: Boolean) { private val planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase] val inputNames = Array("f0", "f1", "f2", "f3", "f4") val inputTypes: Array[LogicalType] = Array( - new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new DoubleType(), new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH)) + VarCharType.STRING_TYPE, new BigIntType(), new DoubleType(), new BigIntType(), + VarCharType.STRING_TYPE) val inputType: RowType = RowType.of(inputTypes, inputNames) val relBuilder: RelBuilder = planner.getRelBuilder.values( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala index d71a2a3..c5bf13b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/BatchAggTestBase.scala @@ -43,7 +43,7 @@ abstract class BatchAggTestBase extends AggTestBase(isBatchMode = true) { val globalOutputType = RowType.of( Array[LogicalType]( - new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, new BigIntType(), new DoubleType(), new BigIntType()), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala index 720263b..62ef130 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGeneratorTest.scala @@ -35,7 +35,7 @@ class HashAggCodeGeneratorTest extends BatchAggTestBase { val localOutputType = RowType.of( Array[LogicalType]( - new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, new BigIntType(), new BigIntType(), new DoubleType(), new BigIntType(), new BigIntType(), new BigIntType()), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala index f69766f1..90f981c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGeneratorTest.scala @@ -32,7 +32,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase { val localOutputType = RowType.of( Array[LogicalType]( - new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, new BigIntType(), new BigIntType(), new DoubleType(), new BigIntType(), fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)), @@ -95,7 +95,7 @@ class SortAggCodeGeneratorTest extends BatchAggTestBase { : (CodeGenOperatorFactory[RowData], RowType, RowType) = { val localOutputType = RowType.of( Array[LogicalType]( - new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, VarCharType.STRING_TYPE, new BigIntType(), new BigIntType(), new DoubleType(), new BigIntType(), fromTypeInfoToLogicalType(imperativeAggFunc.getAccumulatorType)), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index b62f158..48898e6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -374,7 +374,7 @@ abstract class ExpressionTestBase { // generate code val resultType = RowType.of(Seq.fill(rexNodes.size)( - new VarCharType(VarCharType.MAX_LENGTH)): _*) + VarCharType.STRING_TYPE): _*) val exprs = stringTestExprs.map(exprGenerator.generateExpression) val genExpr = exprGenerator.generateResultExpression(exprs, resultType, classOf[BinaryRowData]) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala index 50701b9..b6fc21e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.scala @@ -34,7 +34,7 @@ class DagOptimizationTest extends TableTestBase { util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f) - val STRING = new VarCharType(VarCharType.MAX_LENGTH) + val STRING = VarCharType.STRING_TYPE val LONG = new BigIntType() val INT = new IntType() val DOUBLE = new DoubleType() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala index f12993f..4759afa 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala @@ -198,7 +198,7 @@ object MetadataTestUtil { val fieldNames = Array("a", "b", "c", "proctime", "rowtime") val fieldTypes = Array[LogicalType]( new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new IntType(), new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3), new TimestampType(true, TimestampKind.ROWTIME, 3)) @@ -217,7 +217,7 @@ object MetadataTestUtil { val fieldNames = Array("a", "b", "c", "proctime", "rowtime") val fieldTypes = Array[LogicalType]( new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new IntType(), new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3), new TimestampType(true, TimestampKind.ROWTIME, 3)) @@ -238,7 +238,7 @@ object MetadataTestUtil { val fieldTypes = Array[LogicalType]( new IntType(), new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3), new TimestampType(true, TimestampKind.ROWTIME, 3)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala index 4502232..414b044 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.scala @@ -32,7 +32,7 @@ class DagOptimizationTest extends TableTestBase { util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) util.addTableSource[(Int, Long, String)]("MyTable1", 'd, 'e, 'f) - val STRING = new VarCharType(VarCharType.MAX_LENGTH) + val STRING = VarCharType.STRING_TYPE val LONG = new BigIntType() val INT = new IntType() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala index b952a9c..6244402 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/LegacySinkTest.scala @@ -31,7 +31,7 @@ class LegacySinkTest extends TableTestBase { private val util = streamTestUtil() util.addDataStream[(Int, Long, String)]("MyTable", 'a, 'b, 'c) - val STRING = new VarCharType(VarCharType.MAX_LENGTH) + val STRING = VarCharType.STRING_TYPE val LONG = new BigIntType() val INT = new IntType() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala index eb2f7a3..0cf621c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.scala @@ -34,7 +34,7 @@ import org.junit.{Before, Test} class MiniBatchIntervalInferTest extends TableTestBase { private val util = streamTestUtil() - val STRING = new VarCharType(VarCharType.MAX_LENGTH) + val STRING = VarCharType.STRING_TYPE val LONG = new BigIntType() val INT = new IntType() diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala index bdba353..13bced6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSinkITCase.scala @@ -255,7 +255,7 @@ object PartitionableSinkITCase { } val fieldNames = Array("a", "b", "c") - val dataType = Array(new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)) + val dataType = Array(new IntType(), new BigIntType(), VarCharType.STRING_TYPE) val dataNullables = Array(true, true, true) val testData = Seq( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala index b7a4302..27451d4 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/UnionITCase.scala @@ -33,7 +33,7 @@ import scala.collection.Seq class UnionITCase extends BatchTestBase { val type6 = InternalTypeInfo.ofFields( - new IntType(), new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)) + new IntType(), new BigIntType(), VarCharType.STRING_TYPE) val data6 = Seq( binaryRow(type6.toRowFieldTypes, 1, 1L, fromString("Hi")), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 1d887a1..b5e55ae 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -152,8 +152,8 @@ class CalcITCase extends StreamingTestBase { tEnv.registerTable("MyTableRow", t) val outputType = InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, new IntType()) val result = tEnv.sqlQuery(sqlQuery).toAppendStream[RowData] diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java index e7d5773..fb8a5fc 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java @@ -501,8 +501,7 @@ public class BinaryArrayDataTest { writer.writeRow( 0, GenericRowData.of(fromString("1"), 1), - new RowDataSerializer( - RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()))); + new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType()))); writer.setNullAt(1); writer.complete(); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java index c0c91a4..1a200e8 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java @@ -484,8 +484,7 @@ public class BinaryRowDataTest { writer.writeRow( 0, GenericRowData.of(fromString("1"), 1), - new RowDataSerializer( - RowType.of(new VarCharType(VarCharType.MAX_LENGTH), new IntType()))); + new RowDataSerializer(RowType.of(VarCharType.STRING_TYPE, new IntType()))); writer.setNullAt(1); writer.complete(); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java index de0c65b..852633f 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataFormatConvertersTest.java @@ -189,10 +189,10 @@ public class DataFormatConvertersTest { test(new RowTypeInfo(simpleTypes), new Row(simpleTypes.length)); test(new RowTypeInfo(simpleTypes), Row.ofKind(RowKind.DELETE, simpleValues)); test( - InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()), + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()), GenericRowData.of(StringData.fromString("hehe"), 111)); test( - InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()), + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()), GenericRowData.of(null, null)); test(new DecimalDataTypeInfo(10, 5), null); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java index 8995185..1989701 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/aggregate/window/SlicingWindowAggOperatorTest.java @@ -110,8 +110,7 @@ public class SlicingWindowAggOperatorTest { private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor( - OUTPUT_TYPES, - new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); + OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE)); @Test public void testEventTimeHoppingWindows() throws Exception { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java index 99771bc..cdac040 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/ProcTimeDeduplicateFunctionTestBase.java @@ -37,8 +37,7 @@ abstract class ProcTimeDeduplicateFunctionTestBase { Time minTime = Time.milliseconds(10); InternalTypeInfo<RowData> inputRowType = - InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType()); + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType()); int rowKeyIdx = 1; RowDataKeySelector rowKeySelector = diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java index 2a6cc8f..bec4ba1 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/RowTimeDeduplicateFunctionTestBase.java @@ -40,8 +40,7 @@ abstract class RowTimeDeduplicateFunctionTestBase { protected final long miniBatchSize = 4L; protected Time minTtlTime = Time.milliseconds(10); protected InternalTypeInfo inputRowType = - InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType()); + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType(), new BigIntType()); protected TypeSerializer<RowData> serializer = inputRowType.toSerializer(); protected int rowTimeIndex = 2; protected int rowKeyIndex = 0; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java index 9c91345..386fd28 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/deduplicate/window/RowTimeWindowDeduplicateOperatorTest.java @@ -84,8 +84,7 @@ public class RowTimeWindowDeduplicateOperatorTest { private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor( - OUTPUT_TYPES, - new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); + OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE)); private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java index 99f93fd..f73d224 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/RandomSortMergeInnerJoinTest.java @@ -261,13 +261,13 @@ public class RandomSortMergeInnerJoinTest { boolean input1First) throws Exception { InternalTypeInfo<RowData> typeInfo = - InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH)); + InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE); InternalTypeInfo<RowData> joinedInfo = InternalTypeInfo.ofFields( new IntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new IntType(), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE); final TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness = new TwoInputStreamTaskTestHarness<>( diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java index 88c06bc..31eb491 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2HashJoinOperatorTest.java @@ -48,15 +48,13 @@ import java.util.concurrent.LinkedBlockingQueue; public class String2HashJoinOperatorTest implements Serializable { private InternalTypeInfo<RowData> typeInfo = - InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH)); + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE); private InternalTypeInfo<RowData> joinedInfo = InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE); private transient TwoInputStreamTaskTestHarness<BinaryRowData, BinaryRowData, JoinedRowData> testHarness; private ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -354,6 +352,6 @@ public class String2HashJoinOperatorTest implements Serializable { 20, 10000, 10000, - RowType.of(new VarCharType(VarCharType.MAX_LENGTH))); + RowType.of(VarCharType.STRING_TYPE)); } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java index 6a6c5060..719cecf 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/String2SortMergeJoinOperatorTest.java @@ -60,15 +60,13 @@ public class String2SortMergeJoinOperatorTest { private boolean leftIsSmall; InternalTypeInfo<RowData> typeInfo = - InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH)); + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, VarCharType.STRING_TYPE); private InternalTypeInfo<RowData> joinedInfo = InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE); public String2SortMergeJoinOperatorTest(boolean leftIsSmall) { this.leftIsSmall = leftIsSmall; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java index 6f92a34..034fe71 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/TimeIntervalStreamJoinTestBase.java @@ -28,14 +28,14 @@ import org.apache.flink.table.types.logical.VarCharType; /** Base Test for all subclass of {@link TimeIntervalJoin}. */ abstract class TimeIntervalStreamJoinTestBase { InternalTypeInfo<RowData> rowType = - InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)); + InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE); private InternalTypeInfo<RowData> outputRowType = InternalTypeInfo.ofFields( new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE); RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputRowType.toRowFieldTypes()); protected String funcCode = diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java index cc839c6..ff4e801 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalProcessTimeJoinOperatorTest.java @@ -43,7 +43,7 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato private int keyIdx = 0; private InternalTypeInfo<RowData> rowType = - InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)); + InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE); private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector( new int[] {keyIdx}, rowType.toRowFieldTypes()); @@ -51,9 +51,9 @@ public class TemporalProcessTimeJoinOperatorTest extends TemporalTimeJoinOperato private InternalTypeInfo<RowData> outputRowType = InternalTypeInfo.ofFields( new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE); private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputRowType.toRowFieldTypes()); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java index 924e30e..8ba7a87 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/TemporalTimeJoinOperatorTestBase.java @@ -46,17 +46,15 @@ abstract class TemporalTimeJoinOperatorTestBase { new GeneratedJoinCondition("TimeTemporalJoinCondition", funcCode, new Object[0]); protected InternalTypeInfo<RowData> rowType = InternalTypeInfo.ofFields( - new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH)); + new BigIntType(), VarCharType.STRING_TYPE, VarCharType.STRING_TYPE); protected InternalTypeInfo<RowData> outputRowType = InternalTypeInfo.ofFields( new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE, new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE, + VarCharType.STRING_TYPE); protected RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(outputRowType.toRowFieldTypes()); protected int keyIdx = 1; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java index 3b7d093..762d434 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperatorTest.java @@ -50,14 +50,14 @@ import static org.junit.Assert.assertEquals; public class WindowJoinOperatorTest { private static final InternalTypeInfo<RowData> INPUT_ROW_TYPE = - InternalTypeInfo.ofFields(new BigIntType(), new VarCharType(VarCharType.MAX_LENGTH)); + InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE); private static final InternalTypeInfo<RowData> OUTPUT_ROW_TYPE = InternalTypeInfo.ofFields( new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE); private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_ROW_TYPE.toRowFieldTypes()); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java index 94d337c..a9718fc 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/ProcTimeRangeBoundedPrecedingFunctionTest.java @@ -50,7 +50,7 @@ public class ProcTimeRangeBoundedPrecedingFunctionTest { private LogicalType[] inputFieldTypes = new LogicalType[] { - new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), + VarCharType.STRING_TYPE, new BigIntType(), }; private LogicalType[] accTypes = new LogicalType[] {new BigIntType()}; diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java index 689dec6..3f85eee 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/over/RowTimeOverWindowTestBase.java @@ -42,9 +42,7 @@ public class RowTimeOverWindowTestBase { }; protected LogicalType[] inputFieldTypes = - new LogicalType[] { - new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new BigIntType() - }; + new LogicalType[] {VarCharType.STRING_TYPE, new BigIntType(), new BigIntType()}; protected LogicalType[] accTypes = new LogicalType[] {new BigIntType()}; protected RowDataKeySelector keySelector = diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java index aad99bc..c38e12c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/TopNFunctionTestBase.java @@ -58,8 +58,7 @@ abstract class TopNFunctionTestBase { long cacheSize = 10000L; InternalTypeInfo<RowData> inputRowType = - InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), new BigIntType(), new IntType()); + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new BigIntType(), new IntType()); static GeneratedRecordComparator generatedSortKeyComparator = new GeneratedRecordComparator("", "", new Object[0]) { @@ -107,10 +106,7 @@ abstract class TopNFunctionTestBase { private InternalTypeInfo<RowData> outputTypeWithRowNumber = InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), - new BigIntType(), - new IntType(), - new BigIntType()); + VarCharType.STRING_TYPE, new BigIntType(), new IntType(), new BigIntType()); RowDataHarnessAssertor assertorWithoutRowNumber = new RowDataHarnessAssertor( diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java index 087cc4c..60f686e 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/rank/window/WindowRankOperatorTest.java @@ -106,8 +106,7 @@ public class WindowRankOperatorTest { private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor( - OUTPUT_TYPES, - new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); + OUTPUT_TYPES, new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE)); private static final LogicalType[] OUTPUT_TYPES_WITHOUT_RANK_NUMBER = new LogicalType[] {new VarCharType(Integer.MAX_VALUE), new IntType(), new BigIntType()}; @@ -118,7 +117,7 @@ public class WindowRankOperatorTest { private static final RowDataHarnessAssertor ASSERTER_WITHOUT_RANK_NUMBER = new RowDataHarnessAssertor( OUTPUT_TYPES_WITHOUT_RANK_NUMBER, - new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); + new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE)); private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC"); private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai"); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java index e42a1ec..fa29703 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/ProcTimeSortOperatorTest.java @@ -43,10 +43,7 @@ public class ProcTimeSortOperatorTest { private InternalTypeInfo<RowData> inputRowType = InternalTypeInfo.ofFields( - new IntType(), - new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), - new IntType()); + new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType()); private GeneratedRecordComparator gComparator = new GeneratedRecordComparator("", "", new Object[0]) { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java index 7acaaa5..53cf10b 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/RowTimeSortOperatorTest.java @@ -46,10 +46,7 @@ public class RowTimeSortOperatorTest { public void testSortOnTwoFields() throws Exception { InternalTypeInfo<RowData> inputRowType = InternalTypeInfo.ofFields( - new IntType(), - new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), - new IntType()); + new IntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType()); // Note: RowTimeIdx must be 0 in product environment, the value is 1 here just for simplify // the testing @@ -134,10 +131,7 @@ public class RowTimeSortOperatorTest { public void testOnlySortOnRowTime() throws Exception { InternalTypeInfo<RowData> inputRowType = InternalTypeInfo.ofFields( - new BigIntType(), - new BigIntType(), - new VarCharType(VarCharType.MAX_LENGTH), - new IntType()); + new BigIntType(), new BigIntType(), VarCharType.STRING_TYPE, new IntType()); int rowTimeIdx = 0; RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(inputRowType.toRowFieldTypes()); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java index e535ede..5d4879c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperatorTest.java @@ -40,7 +40,7 @@ import static org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord public class StreamSortOperatorTest { private InternalTypeInfo<RowData> inputRowType = - InternalTypeInfo.ofFields(new VarCharType(VarCharType.MAX_LENGTH), new IntType()); + InternalTypeInfo.ofFields(VarCharType.STRING_TYPE, new IntType()); private GeneratedRecordComparator sortKeyComparator = new GeneratedRecordComparator("", "", new Object[0]) { diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java index e8308f8..76db205 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorContractTest.java @@ -195,8 +195,7 @@ public class WindowOperatorContractTest { long allowedLateness) throws Exception { - LogicalType[] inputTypes = - new LogicalType[] {new VarCharType(VarCharType.MAX_LENGTH), new IntType()}; + LogicalType[] inputTypes = new LogicalType[] {VarCharType.STRING_TYPE, new IntType()}; RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, inputTypes); TypeInformation<RowData> keyType = keySelector.getProducedType(); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java index ed0e75c..6235352 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/WindowOperatorTest.java @@ -123,13 +123,11 @@ public class WindowOperatorTest { private static AtomicInteger closeCalled = new AtomicInteger(0); private LogicalType[] inputFieldTypes = - new LogicalType[] { - new VarCharType(VarCharType.MAX_LENGTH), new IntType(), new BigIntType() - }; + new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), new BigIntType()}; private InternalTypeInfo<RowData> outputType = InternalTypeInfo.ofFields( - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new BigIntType(), new BigIntType(), new BigIntType(), @@ -147,7 +145,7 @@ public class WindowOperatorTest { private RowDataHarnessAssertor assertor = new RowDataHarnessAssertor( outputType.toRowFieldTypes(), - new GenericRowRecordSortComparator(0, new VarCharType(VarCharType.MAX_LENGTH))); + new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE)); private ConcurrentLinkedQueue<Object> doubleRecord( boolean isDouble, StreamRecord<RowData> record) { @@ -1543,8 +1541,7 @@ public class WindowOperatorTest { RowDataHarnessAssertor assertor = new RowDataHarnessAssertor( outputType.toRowFieldTypes(), - new GenericRowRecordSortComparator( - 0, new VarCharType(VarCharType.MAX_LENGTH))); + new GenericRowRecordSortComparator(0, VarCharType.STRING_TYPE)); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java index d8eb34c..3bf9e4c 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/types/DataTypePrecisionFixerTest.java @@ -82,7 +82,7 @@ public class DataTypePrecisionFixerTest { .logicalType(new LocalZonedTimestampType(2)) .expect(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(2)), TestSpecs.fix(Types.STRING) - .logicalType(new VarCharType(VarCharType.MAX_LENGTH)) + .logicalType(VarCharType.STRING_TYPE) .expect(DataTypes.STRING()), // nested diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java index 1e0aa14..d87a5f0 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/typeutils/RowDataSerializerTest.java @@ -94,7 +94,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> { private static Object[] testRowDataSerializer() { InternalTypeInfo<RowData> typeInfo = - InternalTypeInfo.ofFields(new IntType(), new VarCharType(VarCharType.MAX_LENGTH)); + InternalTypeInfo.ofFields(new IntType(), VarCharType.STRING_TYPE); GenericRowData row1 = new GenericRowData(2); row1.setField(0, 1); row1.setField(1, fromString("a")); @@ -122,7 +122,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> { new IntType(), new IntType(), new IntType(), - new VarCharType(VarCharType.MAX_LENGTH)); + VarCharType.STRING_TYPE); GenericRowData row = new GenericRowData(13); row.setField(0, 2); @@ -147,7 +147,7 @@ public class RowDataSerializerTest extends SerializerTestInstance<RowData> { InternalTypeInfo.ofFields( new IntType(), new DoubleType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new ArrayType(new IntType()), new MapType(new IntType(), new IntType())); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java index a599f29..9d50c32 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java @@ -53,7 +53,7 @@ public abstract class BytesHashMapTestBase<K> extends BytesMapTestBase { static final LogicalType[] KEY_TYPES = new LogicalType[] { new IntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new DoubleType(), new BigIntType(), new BooleanType(), diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java index 22dca2d..c4a5d63 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesMultiMapTestBase.java @@ -50,7 +50,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase { static final LogicalType[] KEY_TYPES = new LogicalType[] { new IntType(), - new VarCharType(VarCharType.MAX_LENGTH), + VarCharType.STRING_TYPE, new DoubleType(), new BigIntType(), new BooleanType(), @@ -60,7 +60,7 @@ public abstract class BytesMultiMapTestBase<K> extends BytesMapTestBase { static final LogicalType[] VALUE_TYPES = new LogicalType[] { - new VarCharType(VarCharType.MAX_LENGTH), new IntType(), + VarCharType.STRING_TYPE, new IntType(), }; protected final PagedTypeSerializer<K> keySerializer;