This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fcc7bc2e5e5 [FLINK-33523][table-planner] Revert [FLINK-31835] Fix the array type that can't be converted from the external primitive array" fcc7bc2e5e5 is described below commit fcc7bc2e5e529b135aee28d732b8c5a6769afdae Author: Martijn Visser <2989614+martijnvis...@users.noreply.github.com> AuthorDate: Thu Dec 7 15:16:37 2023 +0100 [FLINK-33523][table-planner] Revert [FLINK-31835] Fix the array type that can't be converted from the external primitive array" This reverts commit a6adbdda --- .../flink/table/types/CollectionDataType.java | 24 +-------- .../org/apache/flink/table/types/DataTypeTest.java | 14 ----- .../planner/functions/CastFunctionITCase.java | 2 +- .../planner/runtime/stream/sql/FunctionITCase.java | 60 ---------------------- .../planner/runtime/stream/table/ValuesITCase.java | 18 +------ .../table/data/DataStructureConvertersTest.java | 24 +-------- .../flink/table/test/TableAssertionTest.java | 2 +- 7 files changed, 7 insertions(+), 137 deletions(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java index 9188530a4ff..239e36eb201 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java @@ -26,8 +26,6 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.util.Preconditions; -import org.apache.commons.lang3.ClassUtils; - import javax.annotation.Nullable; import java.lang.reflect.Array; @@ -120,27 +118,9 @@ public final class CollectionDataType extends DataType { // arrays are a special case because their default conversion class depends on the // conversion class of the element type if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == null) { - Class<?> conversionClass = - wrapOrUnWrap( - elementDataType.getConversionClass(), - elementDataType.getLogicalType().isNullable()); - - return Array.newInstance(conversionClass, 0).getClass(); - } - return wrapOrUnWrap(clazz, elementDataType.getLogicalType().isNullable()); - } - - private static Class<?> wrapOrUnWrap(@Nullable Class<?> source, boolean nullable) { - if (source == null) { - return null; - } - if (nullable) { - return source.isPrimitive() ? ClassUtils.primitiveToWrapper(source) : source; - } else { - return ClassUtils.isPrimitiveWrapper(source) - ? ClassUtils.wrapperToPrimitive(source) - : source; + return Array.newInstance(elementDataType.getConversionClass(), 0).getClass(); } + return clazz; } private DataType updateInnerDataType(DataType elementDataType) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java index ce9239266f2..a7d824009bf 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java @@ -24,8 +24,6 @@ import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; -import org.apache.flink.table.types.inference.TypeTransformations; -import org.apache.flink.table.types.utils.DataTypeUtils; import org.junit.jupiter.api.Test; @@ -222,16 +220,4 @@ class DataTypeTest { assertThat(DataType.getFields(ARRAY(INT()))).isEmpty(); assertThat(DataType.getFields(INT())).isEmpty(); } - - @Test - void testArrayConversionClass() { - assertThat(DataTypes.ARRAY(INT())).hasConversionClass(Integer[].class); - assertThat(DataTypes.ARRAY(INT().notNull())).hasConversionClass(int[].class); - DataType type = DataTypes.ARRAY(INT()); - assertThat(DataTypeUtils.transform(type, TypeTransformations.toNullable())) - .hasConversionClass(Integer[].class); - type = DataTypes.ARRAY(INT()).bridgedTo(int[].class); - assertThat(DataTypeUtils.transform(type, TypeTransformations.toNullable())) - .hasConversionClass(int[].class); - } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java index 048890ece7f..be0fd373d69 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionITCase.java @@ -1210,7 +1210,7 @@ public class CastFunctionITCase extends BuiltInFunctionTestBase { new Long[] {1L, null, 2L}) .build(), CastTestSpecBuilder.testCastTo(ARRAY(BIGINT().notNull())) - .fromCase(ARRAY(INT().notNull()), new Integer[] {1, 2}, new long[] {1L, 2L}) + .fromCase(ARRAY(INT().notNull()), new Integer[] {1, 2}, new Long[] {1L, 2L}) .build(), CastTestSpecBuilder.testCastTo(ARRAY(ROW(INT(), STRING()).notNull())) .fromCase( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java index 7b0f9d3c15f..628c2f10e73 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java @@ -1336,50 +1336,6 @@ public class FunctionITCase extends StreamingTestBase { "drop function lowerUdf"); } - @Test - void testArrayWithPrimitiveType() { - List<Row> sourceData = Arrays.asList(Row.of(1, 2), Row.of(3, 4)); - TestCollectionTableFactory.reset(); - TestCollectionTableFactory.initData(sourceData); - - tEnv().executeSql( - "CREATE TABLE SourceTable(i INT NOT NULL, j INT NOT NULL) WITH ('connector' = 'COLLECTION')"); - tEnv().executeSql( - "CREATE FUNCTION row_of_array AS '" - + RowOfArrayWithIntFunction.class.getName() - + "'"); - List<Row> rows = - CollectionUtil.iteratorToList( - tEnv().executeSql("SELECT row_of_array(i, j) FROM SourceTable").collect()); - assertThat(rows) - .isEqualTo( - Arrays.asList( - Row.of(Row.of((Object) new int[] {1, 2})), - Row.of(Row.of((Object) new int[] {3, 4})))); - } - - @Test - void testArrayWithPrimitiveBoxedType() { - List<Row> sourceData = Arrays.asList(Row.of(1, null), Row.of(3, null)); - TestCollectionTableFactory.reset(); - TestCollectionTableFactory.initData(sourceData); - - tEnv().executeSql( - "CREATE TABLE SourceTable(i INT NOT NULL, j INT) WITH ('connector' = 'COLLECTION')"); - tEnv().executeSql( - "CREATE FUNCTION row_of_array AS '" - + RowOfArrayWithIntegerFunction.class.getName() - + "'"); - List<Row> rows = - CollectionUtil.iteratorToList( - tEnv().executeSql("SELECT row_of_array(i, j) FROM SourceTable").collect()); - assertThat(rows) - .isEqualTo( - Arrays.asList( - Row.of(Row.of((Object) new Integer[] {1, null})), - Row.of(Row.of((Object) new Integer[] {3, null})))); - } - // -------------------------------------------------------------------------------------------- // Test functions // -------------------------------------------------------------------------------------------- @@ -1807,22 +1763,6 @@ public class FunctionITCase extends StreamingTestBase { } } - /** A function with Row of array with int as return type for test FLINK-31835. */ - public static class RowOfArrayWithIntFunction extends ScalarFunction { - @DataTypeHint("Row<t ARRAY<INT NOT NULL>>") - public Row eval(int... v) { - return Row.of((Object) v); - } - } - - /** A function with Row of array with integer as return type for test FLINK-31835. */ - public static class RowOfArrayWithIntegerFunction extends ScalarFunction { - @DataTypeHint("Row<t ARRAY<INT>>") - public Row eval(Integer... v) { - return Row.of((Object) v); - } - } - private interface FunctionCreator { void createFunction(TableEnvironment environment); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java index f03c3815a51..1087bd85af0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/ValuesITCase.java @@ -332,23 +332,7 @@ class ValuesITCase extends StreamingTestBase { mapData.put(1, 1); mapData.put(2, 2); - Row row = Row.of(mapData, Row.of(1, 2, 3), new int[] {1, 2}); - Table values = tEnv().fromValues(Collections.singletonList(row)); - tEnv().createTemporaryView("values_t", values); - List<Row> results = - CollectionUtil.iteratorToList( - tEnv().executeSql("select * from values_t").collect()); - - assertThat(results).containsExactly(row); - } - - @Test - void testArrayWithNullablePrimitiveType() { - Map<Integer, Integer> mapData = new HashMap<>(); - mapData.put(1, 1); - mapData.put(2, 2); - - Row row = Row.of(mapData, Row.of(1, 2, 3), new Integer[] {1, 2, null}); + Row row = Row.of(mapData, Row.of(1, 2, 3), new Integer[] {1, 2}); Table values = tEnv().fromValues(Collections.singletonList(row)); tEnv().createTemporaryView("values_t", values); List<Row> results = diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java index f83a74a312f..9662d7179b3 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/DataStructureConvertersTest.java @@ -196,14 +196,6 @@ public class DataStructureConvertersTest { 1, 2, 3, 4))), // test List that is not backed by an array - // test for Array with default conversion class - TestSpec.forDataType(ARRAY(INT().notNull())) - .disableBridging() - .convertedTo(int[].class, new int[] {1, 2, 3, 4}), - TestSpec.forDataType(ARRAY(INT())) - .disableBridging() - .convertedTo(Integer[].class, new Integer[] {1, 2, 3, 4}), - // arrays of TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE are skipped for // simplicity @@ -376,14 +368,10 @@ public class DataStructureConvertersTest { @Test public void testConversions() { for (Map.Entry<Class<?>, Object> from : testSpec.conversions.entrySet()) { - DataType fromDataType = testSpec.dataType; - if (testSpec.bridgeToTargetClass) { - fromDataType = testSpec.dataType.bridgedTo(from.getKey()); - } + final DataType fromDataType = testSpec.dataType.bridgedTo(from.getKey()); if (testSpec.expectedErrorMessage != null) { - final DataType type = fromDataType; - assertThatThrownBy(() -> DataStructureConverters.getConverter(type)) + assertThatThrownBy(() -> DataStructureConverters.getConverter(fromDataType)) .isInstanceOf(TableException.class) .hasMessage(testSpec.expectedErrorMessage); } else { @@ -426,8 +414,6 @@ public class DataStructureConvertersTest { private final Map<Class<?>, Object> conversionsWithAnotherValue; - private boolean bridgeToTargetClass; - private @Nullable String expectedErrorMessage; private TestSpec(String description, DataType dataType) { @@ -435,7 +421,6 @@ public class DataStructureConvertersTest { this.dataType = dataType; this.conversions = new LinkedHashMap<>(); this.conversionsWithAnotherValue = new LinkedHashMap<>(); - this.bridgeToTargetClass = true; } static TestSpec forDataType(AbstractDataType<?> dataType) { @@ -468,11 +453,6 @@ public class DataStructureConvertersTest { return this; } - TestSpec disableBridging() { - this.bridgeToTargetClass = false; - return this; - } - @Override public String toString() { return description; diff --git a/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java index df553f163f2..8ad96c2262b 100644 --- a/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java +++ b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/test/TableAssertionTest.java @@ -57,7 +57,7 @@ class TableAssertionTest { BinaryRowData binaryRowData = new RowDataSerializer((RowType) dataType.getLogicalType()) .toBinaryRow(genericRowData); - Row row = Row.of(10, "my string", new boolean[] {true, false}); + Row row = Row.of(10, "my string", new Boolean[] {true, false}); // Test equality with RowData assertThat(binaryRowData)