This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c38ffac [FLINK-25228][table-test-utils] Introduce flink-table-test-utils c38ffac is described below commit c38ffac7c5ac5f7ea3ff2511ef71ae898d74c554 Author: slinkydeveloper <francescogu...@gmail.com> AuthorDate: Wed Dec 15 15:10:07 2021 +0100 [FLINK-25228][table-test-utils] Introduce flink-table-test-utils This closes #18255. --- flink-table/README.md | 4 + .../apache/flink/table/test/ArrayDataAssert.java | 65 ++++++++ .../apache/flink/table/test/DataTypeAssert.java | 2 + .../flink/table/test/DataTypeConditions.java | 2 + .../apache/flink/table/test/InternalDataUtils.java | 165 +++++++++++++++++++++ .../apache/flink/table/test/LogicalTypeAssert.java | 2 + .../flink/table/test/LogicalTypeConditions.java | 2 + .../org/apache/flink/table/test/MapDataAssert.java | 65 ++++++++ .../test/{StringDataAssert.java => RowAssert.java} | 27 ++-- .../org/apache/flink/table/test/RowDataAssert.java | 32 ++++ .../apache/flink/table/test/RowDataListAssert.java | 80 ++++++++++ .../apache/flink/table/test/StringDataAssert.java | 2 + .../apache/flink/table/test/TableAssertions.java | 82 ++++++++++ .../planner/functions/casting/CastRulesTest.java | 26 +--- flink-table/flink-table-test-utils/pom.xml | 124 ++++++++++++++++ .../src/test/java/TableAssertionTest.java | 75 ++++++++++ flink-table/pom.xml | 1 + tools/ci/stage.sh | 3 +- 18 files changed, 726 insertions(+), 33 deletions(-) diff --git a/flink-table/README.md b/flink-table/README.md index b676673..92026fc 100644 --- a/flink-table/README.md +++ b/flink-table/README.md @@ -59,6 +59,10 @@ If you want to use Table API & SQL, check out the [documentation](https://nightl * `flink-sql-client`: CLI tool to submit queries to a Flink cluster +### Testing + +* `flink-table-test-utils`: Brings in transitively all the dependencies you need to execute Table pipelines and provides some test utilities such as assertions, mocks and test harnesses. + ### Notes No module except `flink-table-planner` should depend on `flink-table-runtime` in production classpath, diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/ArrayDataAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/ArrayDataAssert.java new file mode 100644 index 0000000..ee69280 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/ArrayDataAssert.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.assertj.core.api.AbstractAssert; + +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Assertions for {@link ArrayData}. */ +@Experimental +public class ArrayDataAssert extends AbstractAssert<ArrayDataAssert, ArrayData> { + + public ArrayDataAssert(ArrayData arrayData) { + super(arrayData, ArrayDataAssert.class); + } + + public ArrayDataAssert hasSize(int size) { + isNotNull(); + assertThat(this.actual.size()).isEqualTo(size); + return this; + } + + public ArrayDataAssert asGeneric(DataType dataType) { + return asGeneric(dataType.getLogicalType()); + } + + public ArrayDataAssert asGeneric(LogicalType logicalType) { + GenericArrayData actual = InternalDataUtils.toGenericArray(this.actual, logicalType); + return new ArrayDataAssert(actual) + .usingComparator( + (x, y) -> { + // Avoid converting actual again + x = x == actual ? x : InternalDataUtils.toGenericArray(x, logicalType); + y = y == actual ? y : InternalDataUtils.toGenericArray(y, logicalType); + if (Objects.equals(x, y)) { + return 0; + } + return Objects.hashCode(x) < Objects.hashCode(y) ? -1 : 1; + }); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java index 23876e0..3c98b95 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java @@ -18,6 +18,7 @@ package org.apache.flink.table.test; +import org.apache.flink.annotation.Experimental; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; @@ -28,6 +29,7 @@ import org.assertj.core.api.ListAssert; import static org.assertj.core.api.Assertions.not; /** Assertions for {@link DataType}. */ +@Experimental public class DataTypeAssert extends AbstractAssert<DataTypeAssert, DataType> { public DataTypeAssert(DataType dataType) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java index 08b7e08..c282127 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java @@ -18,12 +18,14 @@ package org.apache.flink.table.test; +import org.apache.flink.annotation.Experimental; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.assertj.core.api.Condition; /** Set of conditions to test properties of {@link DataType}. */ +@Experimental public class DataTypeConditions { private DataTypeConditions() {} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/InternalDataUtils.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/InternalDataUtils.java new file mode 100644 index 0000000..8abd316 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/InternalDataUtils.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test; + +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Assertions; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.function.Function; + +class InternalDataUtils { + + static Object toGenericInternalData(Object object, LogicalType logicalType) { + if (object instanceof RowData) { + return toGenericRow((RowData) object, logicalType); + } else if (object instanceof ArrayData) { + return toGenericArray((ArrayData) object, logicalType); + } else if (object instanceof MapData) { + return toGenericMap((MapData) object, logicalType); + } + return object; + } + + static GenericRowData toGenericRow(RowData rowData, LogicalType logicalType) { + final List<LogicalType> fieldTypes = LogicalTypeChecks.getFieldTypes(logicalType); + + final GenericRowData row = new GenericRowData(fieldTypes.size()); + row.setRowKind(rowData.getRowKind()); + + for (int i = 0; i < fieldTypes.size(); i++) { + if (rowData.isNullAt(i)) { + row.setField(i, null); + } else { + LogicalType fieldType = fieldTypes.get(i); + RowData.FieldGetter fieldGetter = RowData.createFieldGetter(fieldType, i); + row.setField( + i, toGenericInternalData(fieldGetter.getFieldOrNull(rowData), fieldType)); + } + } + return row; + } + + static GenericArrayData toGenericArray(ArrayData arrayData, LogicalType logicalType) { + final LogicalType innerElement = ((ArrayType) logicalType).getElementType(); + final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(innerElement); + + final Object[] newArray = new Object[arrayData.size()]; + + for (int i = 0; i < arrayData.size(); i++) { + if (arrayData.isNullAt(i)) { + newArray[i] = null; + } else { + newArray[i] = + toGenericInternalData( + elementGetter.getElementOrNull(arrayData, i), innerElement); + } + } + return new GenericArrayData(newArray); + } + + static GenericMapData toGenericMap(MapData mapData, LogicalType logicalType) { + final LogicalType keyType = + logicalType.is(LogicalTypeRoot.MULTISET) + ? ((MultisetType) logicalType).getElementType() + : ((MapType) logicalType).getKeyType(); + final LogicalType valueType = + logicalType.is(LogicalTypeRoot.MULTISET) + ? new IntType(false) + : ((MapType) logicalType).getValueType(); + + final ArrayData.ElementGetter keyGetter = ArrayData.createElementGetter(keyType); + final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(keyType); + + final ArrayData keys = mapData.keyArray(); + final ArrayData values = mapData.valueArray(); + + final LinkedHashMap<Object, Object> newMap = new LinkedHashMap<>(); + + for (int i = 0; i < mapData.size(); i++) { + Object key = null; + Object value = null; + if (!keys.isNullAt(i)) { + key = toGenericInternalData(keyGetter.getElementOrNull(keys, i), keyType); + } + if (!values.isNullAt(i)) { + value = toGenericInternalData(valueGetter.getElementOrNull(values, i), valueType); + } + newMap.put(key, value); + } + return new GenericMapData(newMap); + } + + static Function<RowData, Row> resolveToExternalOrNull(DataType dataType) { + try { + // Create the converter + Method getConverter = + Class.forName("org.apache.flink.table.data.conversion.DataStructureConverters") + .getMethod("getConverter", DataType.class); + Object converter = getConverter.invoke(null, dataType); + + // Open the converter + converter + .getClass() + .getMethod("open", ClassLoader.class) + .invoke(converter, Thread.currentThread().getContextClassLoader()); + Method toExternalOrNull = + converter.getClass().getMethod("toExternalOrNull", Object.class); + + // Return the lambda to invoke the converter + return rowData -> { + try { + return (Row) toExternalOrNull.invoke(converter, rowData); + } catch (IllegalAccessException | InvocationTargetException e) { + Assertions.fail( + "Something went wrong when trying to use the DataStructureConverter from flink-table-runtime", + e); + return null; // For the compiler + } + }; + } catch (ClassNotFoundException + | InvocationTargetException + | NoSuchMethodException + | IllegalAccessException e) { + Assertions.fail( + "Error when trying to use the RowData to Row conversion. " + + "Perhaps you miss flink-table-runtime in your test classpath?", + e); + return null; // For the compiler + } + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java index eb2e366..771e117 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java @@ -18,6 +18,7 @@ package org.apache.flink.table.test; +import org.apache.flink.annotation.Experimental; import org.apache.flink.table.api.TableException; import org.apache.flink.table.types.LogicalTypesTest; import org.apache.flink.table.types.logical.DecimalType; @@ -37,6 +38,7 @@ import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assertions.not; /** Aseertions for {@link LogicalType}. */ +@Experimental public class LogicalTypeAssert extends AbstractAssert<LogicalTypeAssert, LogicalType> { public LogicalTypeAssert(LogicalType logicalType) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java index 7c8983b..bc7905c 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java @@ -18,11 +18,13 @@ package org.apache.flink.table.test; +import org.apache.flink.annotation.Experimental; import org.apache.flink.table.types.logical.LogicalType; import org.assertj.core.api.Condition; /** Set of conditions to test properties of {@link LogicalType}. */ +@Experimental public class LogicalTypeConditions { private LogicalTypeConditions() {} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/MapDataAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/MapDataAssert.java new file mode 100644 index 0000000..f212082 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/MapDataAssert.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.assertj.core.api.AbstractAssert; + +import java.util.Objects; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Assertions for {@link MapData}. */ +@Experimental +public class MapDataAssert extends AbstractAssert<MapDataAssert, MapData> { + + public MapDataAssert(MapData mapData) { + super(mapData, MapDataAssert.class); + } + + public MapDataAssert hasSize(int size) { + isNotNull(); + assertThat(this.actual.size()).isEqualTo(size); + return this; + } + + public MapDataAssert asGeneric(DataType dataType) { + return asGeneric(dataType.getLogicalType()); + } + + public MapDataAssert asGeneric(LogicalType logicalType) { + GenericMapData actual = InternalDataUtils.toGenericMap(this.actual, logicalType); + return new MapDataAssert(actual) + .usingComparator( + (x, y) -> { + // Avoid converting actual again + x = x == actual ? x : InternalDataUtils.toGenericMap(x, logicalType); + y = y == actual ? y : InternalDataUtils.toGenericMap(y, logicalType); + if (Objects.equals(x, y)) { + return 0; + } + return Objects.hashCode(x) < Objects.hashCode(y) ? -1 : 1; + }); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowAssert.java similarity index 60% copy from flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java copy to flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowAssert.java index d98d124..5cfe3d9 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowAssert.java @@ -18,26 +18,31 @@ package org.apache.flink.table.test; -import org.apache.flink.table.data.StringData; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.assertj.core.api.AbstractAssert; -import org.assertj.core.api.ByteArrayAssert; -import org.assertj.core.api.StringAssert; -/** Assertions for {@link StringData}. */ -public class StringDataAssert extends AbstractAssert<StringDataAssert, StringData> { +import static org.assertj.core.api.Assertions.assertThat; - public StringDataAssert(StringData stringData) { - super(stringData, StringDataAssert.class); +/** Assertions for {@link Row}. */ +@Experimental +public class RowAssert extends AbstractAssert<RowAssert, Row> { + + public RowAssert(Row row) { + super(row, RowAssert.class); } - public StringAssert asString() { + public RowAssert hasKind(RowKind kind) { isNotNull(); - return new StringAssert(this.actual.toString()); + assertThat(this.actual.getKind()).isEqualTo(kind); + return this; } - public ByteArrayAssert asBytes() { + public RowAssert hasArity(int arity) { isNotNull(); - return new ByteArrayAssert(this.actual.toBytes()); + assertThat(this.actual.getArity()).isEqualTo(arity); + return this; } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java index 67e5666..4ec3ad3 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java @@ -18,16 +18,23 @@ package org.apache.flink.table.test; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.RowKind; import org.assertj.core.api.AbstractAssert; import org.assertj.core.api.LongAssert; import org.assertj.core.api.StringAssert; +import java.util.Objects; + import static org.assertj.core.api.Assertions.assertThat; /** Assertions for {@link RowData}. */ +@Experimental public class RowDataAssert extends AbstractAssert<RowDataAssert, RowData> { public RowDataAssert(RowData rowData) { @@ -71,4 +78,29 @@ public class RowDataAssert extends AbstractAssert<RowDataAssert, RowData> { assertThat(this.actual.isNullAt(index)).isFalse(); return this; } + + public RowDataAssert asGeneric(DataType dataType) { + return asGeneric(dataType.getLogicalType()); + } + + public RowDataAssert asGeneric(LogicalType logicalType) { + GenericRowData actual = InternalDataUtils.toGenericRow(this.actual, logicalType); + return new RowDataAssert(actual) + .usingComparator( + (x, y) -> { + // Avoid converting actual again + x = x == actual ? x : InternalDataUtils.toGenericRow(x, logicalType); + y = y == actual ? y : InternalDataUtils.toGenericRow(y, logicalType); + if (Objects.equals(x, y)) { + return 0; + } + return Objects.hashCode(x) < Objects.hashCode(y) ? -1 : 1; + }); + } + + /** In order to execute this assertion, you need flink-table-runtime in the classpath. */ + public RowAssert asRow(DataType dataType) { + return new RowAssert( + InternalDataUtils.resolveToExternalOrNull(dataType).apply(this.actual)); + } } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataListAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataListAssert.java new file mode 100644 index 0000000..7c1ec6e --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataListAssert.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.test; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.Row; + +import org.assertj.core.api.AbstractListAssert; +import org.assertj.core.api.ListAssert; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +/** Assertions for {@link List} of {@link RowData}. */ +@Experimental +public class RowDataListAssert + extends AbstractListAssert<RowDataListAssert, List<RowData>, RowData, RowDataAssert> { + + public RowDataListAssert(List<RowData> rowDataList) { + super(rowDataList, RowDataListAssert.class); + } + + @Override + protected RowDataAssert toAssert(RowData value, String description) { + return new RowDataAssert(value).as(description); + } + + @Override + @SuppressWarnings({"unchecked"}) + protected RowDataListAssert newAbstractIterableAssert(Iterable<? extends RowData> iterable) { + if (iterable instanceof List) { + return new RowDataListAssert((List<RowData>) iterable); + } + return new RowDataListAssert( + StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList())); + } + + public RowDataListAssert asGeneric(DataType dataType) { + return asGeneric(dataType.getLogicalType()); + } + + public RowDataListAssert asGeneric(LogicalType logicalType) { + return usingElementComparator( + (x, y) -> { + x = InternalDataUtils.toGenericRow(x, logicalType); + y = InternalDataUtils.toGenericRow(y, logicalType); + if (Objects.equals(x, y)) { + return 0; + } + return Objects.hashCode(x) < Objects.hashCode(y) ? -1 : 1; + }); + } + + /** In order to execute this assertion, you need flink-table-runtime in the classpath. */ + public ListAssert<Row> asRows(DataType dataType) { + return new ListAssert<>( + this.actual.stream().map(InternalDataUtils.resolveToExternalOrNull(dataType))); + } +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java index d98d124..ae4b52d 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java @@ -18,6 +18,7 @@ package org.apache.flink.table.test; +import org.apache.flink.annotation.Experimental; import org.apache.flink.table.data.StringData; import org.assertj.core.api.AbstractAssert; @@ -25,6 +26,7 @@ import org.assertj.core.api.ByteArrayAssert; import org.assertj.core.api.StringAssert; /** Assertions for {@link StringData}. */ +@Experimental public class StringDataAssert extends AbstractAssert<StringDataAssert, StringData> { public StringDataAssert(StringData stringData) { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java index 510a86a..2bf36b6 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java @@ -18,26 +18,108 @@ package org.apache.flink.table.test; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.Row; + +import org.assertj.core.api.AbstractAssert; +import org.assertj.core.api.Assertions; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** Entrypoint for all the various table assertions. */ +@Experimental public class TableAssertions { private TableAssertions() {} + // --- External data structures + + public static RowAssert assertThat(Row row) { + return new RowAssert(row); + } + // --- Internal data structures + public static ArrayDataAssert assertThat(ArrayData actual) { + return new ArrayDataAssert(actual); + } + + public static MapDataAssert assertThat(MapData actual) { + return new MapDataAssert(actual); + } + public static RowDataAssert assertThat(RowData actual) { return new RowDataAssert(actual); } + public static RowDataListAssert assertThatRows(Iterator<RowData> actual) { + return new RowDataListAssert( + StreamSupport.stream( + Spliterators.spliteratorUnknownSize(actual, Spliterator.ORDERED), + false) + .collect(Collectors.toList())); + } + + public static RowDataListAssert assertThatRows(Iterable<RowData> actual) { + if (actual instanceof List) { + return new RowDataListAssert((List<RowData>) actual); + } + return new RowDataListAssert( + StreamSupport.stream(actual.spliterator(), false).collect(Collectors.toList())); + } + + public static RowDataListAssert assertThatRows(Stream<RowData> actual) { + return new RowDataListAssert(actual.collect(Collectors.toList())); + } + + public static RowDataListAssert assertThatRows(RowData... rows) { + return new RowDataListAssert(Arrays.asList(rows)); + } + public static StringDataAssert assertThat(StringData actual) { return new StringDataAssert(actual); } + /** + * Create an assertion for internal data, converted to Generic data structures supporting + * equality assertions. Use this method when the type can be different. + * + * <p>If you need to assert a data always of the same type, use the other assertions like {@link + * #assertThat(RowData)} instead. + */ + public static AbstractAssert<?, ?> assertThatGenericDataOfType( + Object actual, LogicalType logicalType) { + if (actual instanceof ArrayData) { + return new ArrayDataAssert((ArrayData) actual).asGeneric(logicalType); + } else if (actual instanceof MapData) { + return new MapDataAssert((MapData) actual).asGeneric(logicalType); + } else if (actual instanceof RowData) { + return new RowDataAssert((RowData) actual).asGeneric(logicalType); + } else if (actual instanceof StringData) { + return new StringDataAssert((StringData) actual); + } + + return Assertions.assertThatObject(actual); + } + + public static AbstractAssert<?, ?> assertThatGenericDataOfType( + Object actual, DataType dataType) { + return assertThatGenericDataOfType(actual, dataType.getLogicalType()); + } + // --- Types public static DataTypeAssert assertThat(DataType actual) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java index 301ab4c..48d2b75 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java @@ -26,12 +26,9 @@ import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RawValueData; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.binary.BinaryStringDataUtil; -import org.apache.flink.table.data.conversion.DataStructureConverter; -import org.apache.flink.table.data.conversion.DataStructureConverters; import org.apache.flink.table.data.utils.CastExecutor; import org.apache.flink.table.planner.functions.CastFunctionITCase; import org.apache.flink.table.types.DataType; @@ -90,6 +87,7 @@ import static org.apache.flink.table.api.DataTypes.YEAR; import static org.apache.flink.table.data.DecimalData.fromBigDecimal; import static org.apache.flink.table.data.StringData.fromString; import static org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; +import static org.apache.flink.table.test.TableAssertions.assertThatGenericDataOfType; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -1267,14 +1265,12 @@ class CastRulesTest { this.inputTypes.add(srcDataType); this.assertionExecutors.add( executor -> { - Object expected = sanitizeTestData(targetType, target); - - assertThat(sanitizeTestData(targetType, executor.cast(src))) - .isEqualTo(expected); - assertThat(sanitizeTestData(targetType, executor.cast(src))) + assertThatGenericDataOfType(executor.cast(src), targetType) + .isEqualTo(target); + assertThatGenericDataOfType(executor.cast(src), targetType) .as( "Error when reusing the rule. Perhaps there is some state that needs to be reset") - .isEqualTo(expected); + .isEqualTo(target); }); this.descriptions.add("{" + src + " => " + target + "}"); this.castContexts.add(castContext); @@ -1319,17 +1315,5 @@ class CastRulesTest { } return Arrays.stream(testSpecs); } - - // This method makes sure that we can correctly compare rows - private Object sanitizeTestData(DataType dataType, Object value) { - if (value instanceof RowData) { - // Convert to GenericRowData using the DataStructureConverter - DataStructureConverter<Object, Object> converter = - DataStructureConverters.getConverter(dataType); - converter.open(Thread.currentThread().getContextClassLoader()); - return converter.toInternalOrNull(converter.toExternalOrNull(value)); - } - return value; - } } } diff --git a/flink-table/flink-table-test-utils/pom.xml b/flink-table/flink-table-test-utils/pom.xml new file mode 100644 index 0000000..28f14ad --- /dev/null +++ b/flink-table/flink-table-test-utils/pom.xml @@ -0,0 +1,124 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table</artifactId> + <version>1.15-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-table-test-utils</artifactId> + <name>Flink : Table : Test Utils</name> + <description> + This module contains test utilities for the Table API/SQL ecosystem for end users, connectors and formats. + </description> + + <packaging>jar</packaging> + + <dependencies> + <!-- Core test utils (MiniCluster) --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-junit</artifactId> + <version>${project.version}</version> + <scope>compile</scope> + </dependency> + + <!-- Table dependencies required to create and run a SQL job --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-loader</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- We bring the assertions from flink-table-common and repackage them --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <type>test-jar</type> + <scope>compile</scope> + <!-- We don't want table-common to be used as transitive dependency --> + <optional>true</optional> + </dependency> + + <!-- Required for the assertions --> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <!-- Exclude all flink-dist files and only include flink-table-* --> + <execution> + <id>shade-flink</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <!-- Include from table-common test jar the assertions --> + <artifact>org.apache.flink:flink-table-common:tests</artifact> + <includes> + <include>org/apache/flink/table/test/*.java</include> + </includes> + </filter> + </filters> + <artifactSet> + <includes combine.children="append"> + <include>org.apache.flink:flink-table-common:tests</include> + </includes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-table/flink-table-test-utils/src/test/java/TableAssertionTest.java b/flink-table/flink-table-test-utils/src/test/java/TableAssertionTest.java new file mode 100644 index 0000000..05a54b9 --- /dev/null +++ b/flink-table/flink-table-test-utils/src/test/java/TableAssertionTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.flink.table.test.TableAssertions; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Test; + +import static org.apache.flink.table.api.DataTypes.ARRAY; +import static org.apache.flink.table.api.DataTypes.BOOLEAN; +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.test.TableAssertions.assertThat; +import static org.apache.flink.table.test.TableAssertions.assertThatRows; + +/** Tests for {@link TableAssertions} assertions. */ +class TableAssertionTest { + + @Test + void testAssertRowDataWithConversion() { + DataType dataType = + ROW( + FIELD("a", INT()), + FIELD("b", STRING()), + FIELD("c", ARRAY(BOOLEAN().notNull()))); + + GenericRowData genericRowData = + GenericRowData.of( + 10, + StringData.fromString("my string"), + new GenericArrayData(new boolean[] {true, false})); + BinaryRowData binaryRowData = + new RowDataSerializer((RowType) dataType.getLogicalType()) + .toBinaryRow(genericRowData); + Row row = Row.of(10, "my string", new Boolean[] {true, false}); + + // Test equality with RowData + assertThat(binaryRowData) + .asGeneric(dataType) + .isEqualTo(genericRowData) + .isEqualTo(binaryRowData.copy()); + assertThatRows(binaryRowData) + .asGeneric(dataType) + .containsOnly(genericRowData) + .containsOnly(binaryRowData); + + // Test equality with Row + assertThat(binaryRowData).asRow(dataType).isEqualTo(row); + assertThatRows(binaryRowData).asRows(dataType).containsOnly(row); + } +} diff --git a/flink-table/pom.xml b/flink-table/pom.xml index b8605d2..b091829 100644 --- a/flink-table/pom.xml +++ b/flink-table/pom.xml @@ -47,6 +47,7 @@ under the License. <module>flink-sql-parser</module> <module>flink-sql-parser-hive</module> <module>flink-table-code-splitter</module> + <module>flink-table-test-utils</module> </modules> <dependencyManagement> diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh index 894977f..e28aae1 100755 --- a/tools/ci/stage.sh +++ b/tools/ci/stage.sh @@ -68,7 +68,8 @@ flink-table/flink-sql-client,\ flink-table/flink-table-planner,\ flink-table/flink-table-planner-loader,\ flink-table/flink-table-runtime,\ -flink-table/flink-table-code-splitter" +flink-table/flink-table-code-splitter,\ +flink-table/flink-table-test-utils" MODULES_CONNECTORS="\ flink-contrib/flink-connector-wikiedits,\