This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c38ffac  [FLINK-25228][table-test-utils] Introduce 
flink-table-test-utils
c38ffac is described below

commit c38ffac7c5ac5f7ea3ff2511ef71ae898d74c554
Author: slinkydeveloper <francescogu...@gmail.com>
AuthorDate: Wed Dec 15 15:10:07 2021 +0100

    [FLINK-25228][table-test-utils] Introduce flink-table-test-utils
    
    This closes #18255.
---
 flink-table/README.md                              |   4 +
 .../apache/flink/table/test/ArrayDataAssert.java   |  65 ++++++++
 .../apache/flink/table/test/DataTypeAssert.java    |   2 +
 .../flink/table/test/DataTypeConditions.java       |   2 +
 .../apache/flink/table/test/InternalDataUtils.java | 165 +++++++++++++++++++++
 .../apache/flink/table/test/LogicalTypeAssert.java |   2 +
 .../flink/table/test/LogicalTypeConditions.java    |   2 +
 .../org/apache/flink/table/test/MapDataAssert.java |  65 ++++++++
 .../test/{StringDataAssert.java => RowAssert.java} |  27 ++--
 .../org/apache/flink/table/test/RowDataAssert.java |  32 ++++
 .../apache/flink/table/test/RowDataListAssert.java |  80 ++++++++++
 .../apache/flink/table/test/StringDataAssert.java  |   2 +
 .../apache/flink/table/test/TableAssertions.java   |  82 ++++++++++
 .../planner/functions/casting/CastRulesTest.java   |  26 +---
 flink-table/flink-table-test-utils/pom.xml         | 124 ++++++++++++++++
 .../src/test/java/TableAssertionTest.java          |  75 ++++++++++
 flink-table/pom.xml                                |   1 +
 tools/ci/stage.sh                                  |   3 +-
 18 files changed, 726 insertions(+), 33 deletions(-)

diff --git a/flink-table/README.md b/flink-table/README.md
index b676673..92026fc 100644
--- a/flink-table/README.md
+++ b/flink-table/README.md
@@ -59,6 +59,10 @@ If you want to use Table API & SQL, check out the 
[documentation](https://nightl
 
 * `flink-sql-client`: CLI tool to submit queries to a Flink cluster
 
+### Testing
+
+* `flink-table-test-utils`: Brings in transitively all the dependencies you 
need to execute Table pipelines and provides some test utilities such as 
assertions, mocks and test harnesses.
+
 ### Notes
 
 No module except `flink-table-planner` should depend on `flink-table-runtime` 
in production classpath, 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/ArrayDataAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/ArrayDataAssert.java
new file mode 100644
index 0000000..ee69280
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/ArrayDataAssert.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.assertj.core.api.AbstractAssert;
+
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Assertions for {@link ArrayData}. */
+@Experimental
+public class ArrayDataAssert extends AbstractAssert<ArrayDataAssert, 
ArrayData> {
+
+    public ArrayDataAssert(ArrayData arrayData) {
+        super(arrayData, ArrayDataAssert.class);
+    }
+
+    public ArrayDataAssert hasSize(int size) {
+        isNotNull();
+        assertThat(this.actual.size()).isEqualTo(size);
+        return this;
+    }
+
+    public ArrayDataAssert asGeneric(DataType dataType) {
+        return asGeneric(dataType.getLogicalType());
+    }
+
+    public ArrayDataAssert asGeneric(LogicalType logicalType) {
+        GenericArrayData actual = 
InternalDataUtils.toGenericArray(this.actual, logicalType);
+        return new ArrayDataAssert(actual)
+                .usingComparator(
+                        (x, y) -> {
+                            // Avoid converting actual again
+                            x = x == actual ? x : 
InternalDataUtils.toGenericArray(x, logicalType);
+                            y = y == actual ? y : 
InternalDataUtils.toGenericArray(y, logicalType);
+                            if (Objects.equals(x, y)) {
+                                return 0;
+                            }
+                            return Objects.hashCode(x) < Objects.hashCode(y) ? 
-1 : 1;
+                        });
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java
index 23876e0..3c98b95 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeAssert.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.test;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 
@@ -28,6 +29,7 @@ import org.assertj.core.api.ListAssert;
 import static org.assertj.core.api.Assertions.not;
 
 /** Assertions for {@link DataType}. */
+@Experimental
 public class DataTypeAssert extends AbstractAssert<DataTypeAssert, DataType> {
 
     public DataTypeAssert(DataType dataType) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java
index 08b7e08..c282127 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/DataTypeConditions.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.table.test;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 
 import org.assertj.core.api.Condition;
 
 /** Set of conditions to test properties of {@link DataType}. */
+@Experimental
 public class DataTypeConditions {
 
     private DataTypeConditions() {}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/InternalDataUtils.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/InternalDataUtils.java
new file mode 100644
index 0000000..8abd316
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/InternalDataUtils.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.function.Function;
+
+class InternalDataUtils {
+
+    static Object toGenericInternalData(Object object, LogicalType 
logicalType) {
+        if (object instanceof RowData) {
+            return toGenericRow((RowData) object, logicalType);
+        } else if (object instanceof ArrayData) {
+            return toGenericArray((ArrayData) object, logicalType);
+        } else if (object instanceof MapData) {
+            return toGenericMap((MapData) object, logicalType);
+        }
+        return object;
+    }
+
+    static GenericRowData toGenericRow(RowData rowData, LogicalType 
logicalType) {
+        final List<LogicalType> fieldTypes = 
LogicalTypeChecks.getFieldTypes(logicalType);
+
+        final GenericRowData row = new GenericRowData(fieldTypes.size());
+        row.setRowKind(rowData.getRowKind());
+
+        for (int i = 0; i < fieldTypes.size(); i++) {
+            if (rowData.isNullAt(i)) {
+                row.setField(i, null);
+            } else {
+                LogicalType fieldType = fieldTypes.get(i);
+                RowData.FieldGetter fieldGetter = 
RowData.createFieldGetter(fieldType, i);
+                row.setField(
+                        i, 
toGenericInternalData(fieldGetter.getFieldOrNull(rowData), fieldType));
+            }
+        }
+        return row;
+    }
+
+    static GenericArrayData toGenericArray(ArrayData arrayData, LogicalType 
logicalType) {
+        final LogicalType innerElement = ((ArrayType) 
logicalType).getElementType();
+        final ArrayData.ElementGetter elementGetter = 
ArrayData.createElementGetter(innerElement);
+
+        final Object[] newArray = new Object[arrayData.size()];
+
+        for (int i = 0; i < arrayData.size(); i++) {
+            if (arrayData.isNullAt(i)) {
+                newArray[i] = null;
+            } else {
+                newArray[i] =
+                        toGenericInternalData(
+                                elementGetter.getElementOrNull(arrayData, i), 
innerElement);
+            }
+        }
+        return new GenericArrayData(newArray);
+    }
+
+    static GenericMapData toGenericMap(MapData mapData, LogicalType 
logicalType) {
+        final LogicalType keyType =
+                logicalType.is(LogicalTypeRoot.MULTISET)
+                        ? ((MultisetType) logicalType).getElementType()
+                        : ((MapType) logicalType).getKeyType();
+        final LogicalType valueType =
+                logicalType.is(LogicalTypeRoot.MULTISET)
+                        ? new IntType(false)
+                        : ((MapType) logicalType).getValueType();
+
+        final ArrayData.ElementGetter keyGetter = 
ArrayData.createElementGetter(keyType);
+        final ArrayData.ElementGetter valueGetter = 
ArrayData.createElementGetter(keyType);
+
+        final ArrayData keys = mapData.keyArray();
+        final ArrayData values = mapData.valueArray();
+
+        final LinkedHashMap<Object, Object> newMap = new LinkedHashMap<>();
+
+        for (int i = 0; i < mapData.size(); i++) {
+            Object key = null;
+            Object value = null;
+            if (!keys.isNullAt(i)) {
+                key = toGenericInternalData(keyGetter.getElementOrNull(keys, 
i), keyType);
+            }
+            if (!values.isNullAt(i)) {
+                value = 
toGenericInternalData(valueGetter.getElementOrNull(values, i), valueType);
+            }
+            newMap.put(key, value);
+        }
+        return new GenericMapData(newMap);
+    }
+
+    static Function<RowData, Row> resolveToExternalOrNull(DataType dataType) {
+        try {
+            // Create the converter
+            Method getConverter =
+                    
Class.forName("org.apache.flink.table.data.conversion.DataStructureConverters")
+                            .getMethod("getConverter", DataType.class);
+            Object converter = getConverter.invoke(null, dataType);
+
+            // Open the converter
+            converter
+                    .getClass()
+                    .getMethod("open", ClassLoader.class)
+                    .invoke(converter, 
Thread.currentThread().getContextClassLoader());
+            Method toExternalOrNull =
+                    converter.getClass().getMethod("toExternalOrNull", 
Object.class);
+
+            // Return the lambda to invoke the converter
+            return rowData -> {
+                try {
+                    return (Row) toExternalOrNull.invoke(converter, rowData);
+                } catch (IllegalAccessException | InvocationTargetException e) 
{
+                    Assertions.fail(
+                            "Something went wrong when trying to use the 
DataStructureConverter from flink-table-runtime",
+                            e);
+                    return null; // For the compiler
+                }
+            };
+        } catch (ClassNotFoundException
+                | InvocationTargetException
+                | NoSuchMethodException
+                | IllegalAccessException e) {
+            Assertions.fail(
+                    "Error when trying to use the RowData to Row conversion. "
+                            + "Perhaps you miss flink-table-runtime in your 
test classpath?",
+                    e);
+            return null; // For the compiler
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java
index eb2e366..771e117 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeAssert.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.test;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.types.LogicalTypesTest;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -37,6 +38,7 @@ import static org.assertj.core.api.Assertions.fail;
 import static org.assertj.core.api.Assertions.not;
 
 /** Aseertions for {@link LogicalType}. */
+@Experimental
 public class LogicalTypeAssert extends AbstractAssert<LogicalTypeAssert, 
LogicalType> {
 
     public LogicalTypeAssert(LogicalType logicalType) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java
index 7c8983b..bc7905c 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/LogicalTypeConditions.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.table.test;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.assertj.core.api.Condition;
 
 /** Set of conditions to test properties of {@link LogicalType}. */
+@Experimental
 public class LogicalTypeConditions {
 
     private LogicalTypeConditions() {}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/MapDataAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/MapDataAssert.java
new file mode 100644
index 0000000..f212082
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/MapDataAssert.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.assertj.core.api.AbstractAssert;
+
+import java.util.Objects;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Assertions for {@link MapData}. */
+@Experimental
+public class MapDataAssert extends AbstractAssert<MapDataAssert, MapData> {
+
+    public MapDataAssert(MapData mapData) {
+        super(mapData, MapDataAssert.class);
+    }
+
+    public MapDataAssert hasSize(int size) {
+        isNotNull();
+        assertThat(this.actual.size()).isEqualTo(size);
+        return this;
+    }
+
+    public MapDataAssert asGeneric(DataType dataType) {
+        return asGeneric(dataType.getLogicalType());
+    }
+
+    public MapDataAssert asGeneric(LogicalType logicalType) {
+        GenericMapData actual = InternalDataUtils.toGenericMap(this.actual, 
logicalType);
+        return new MapDataAssert(actual)
+                .usingComparator(
+                        (x, y) -> {
+                            // Avoid converting actual again
+                            x = x == actual ? x : 
InternalDataUtils.toGenericMap(x, logicalType);
+                            y = y == actual ? y : 
InternalDataUtils.toGenericMap(y, logicalType);
+                            if (Objects.equals(x, y)) {
+                                return 0;
+                            }
+                            return Objects.hashCode(x) < Objects.hashCode(y) ? 
-1 : 1;
+                        });
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowAssert.java
similarity index 60%
copy from 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java
copy to 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowAssert.java
index d98d124..5cfe3d9 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowAssert.java
@@ -18,26 +18,31 @@
 
 package org.apache.flink.table.test;
 
-import org.apache.flink.table.data.StringData;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import org.assertj.core.api.AbstractAssert;
-import org.assertj.core.api.ByteArrayAssert;
-import org.assertj.core.api.StringAssert;
 
-/** Assertions for {@link StringData}. */
-public class StringDataAssert extends AbstractAssert<StringDataAssert, 
StringData> {
+import static org.assertj.core.api.Assertions.assertThat;
 
-    public StringDataAssert(StringData stringData) {
-        super(stringData, StringDataAssert.class);
+/** Assertions for {@link Row}. */
+@Experimental
+public class RowAssert extends AbstractAssert<RowAssert, Row> {
+
+    public RowAssert(Row row) {
+        super(row, RowAssert.class);
     }
 
-    public StringAssert asString() {
+    public RowAssert hasKind(RowKind kind) {
         isNotNull();
-        return new StringAssert(this.actual.toString());
+        assertThat(this.actual.getKind()).isEqualTo(kind);
+        return this;
     }
 
-    public ByteArrayAssert asBytes() {
+    public RowAssert hasArity(int arity) {
         isNotNull();
-        return new ByteArrayAssert(this.actual.toBytes());
+        assertThat(this.actual.getArity()).isEqualTo(arity);
+        return this;
     }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java
index 67e5666..4ec3ad3 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataAssert.java
@@ -18,16 +18,23 @@
 
 package org.apache.flink.table.test;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.RowKind;
 
 import org.assertj.core.api.AbstractAssert;
 import org.assertj.core.api.LongAssert;
 import org.assertj.core.api.StringAssert;
 
+import java.util.Objects;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Assertions for {@link RowData}. */
+@Experimental
 public class RowDataAssert extends AbstractAssert<RowDataAssert, RowData> {
 
     public RowDataAssert(RowData rowData) {
@@ -71,4 +78,29 @@ public class RowDataAssert extends 
AbstractAssert<RowDataAssert, RowData> {
         assertThat(this.actual.isNullAt(index)).isFalse();
         return this;
     }
+
+    public RowDataAssert asGeneric(DataType dataType) {
+        return asGeneric(dataType.getLogicalType());
+    }
+
+    public RowDataAssert asGeneric(LogicalType logicalType) {
+        GenericRowData actual = InternalDataUtils.toGenericRow(this.actual, 
logicalType);
+        return new RowDataAssert(actual)
+                .usingComparator(
+                        (x, y) -> {
+                            // Avoid converting actual again
+                            x = x == actual ? x : 
InternalDataUtils.toGenericRow(x, logicalType);
+                            y = y == actual ? y : 
InternalDataUtils.toGenericRow(y, logicalType);
+                            if (Objects.equals(x, y)) {
+                                return 0;
+                            }
+                            return Objects.hashCode(x) < Objects.hashCode(y) ? 
-1 : 1;
+                        });
+    }
+
+    /** In order to execute this assertion, you need flink-table-runtime in 
the classpath. */
+    public RowAssert asRow(DataType dataType) {
+        return new RowAssert(
+                
InternalDataUtils.resolveToExternalOrNull(dataType).apply(this.actual));
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataListAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataListAssert.java
new file mode 100644
index 0000000..7c1ec6e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/RowDataListAssert.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.test;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.assertj.core.api.AbstractListAssert;
+import org.assertj.core.api.ListAssert;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Assertions for {@link List} of {@link RowData}. */
+@Experimental
+public class RowDataListAssert
+        extends AbstractListAssert<RowDataListAssert, List<RowData>, RowData, 
RowDataAssert> {
+
+    public RowDataListAssert(List<RowData> rowDataList) {
+        super(rowDataList, RowDataListAssert.class);
+    }
+
+    @Override
+    protected RowDataAssert toAssert(RowData value, String description) {
+        return new RowDataAssert(value).as(description);
+    }
+
+    @Override
+    @SuppressWarnings({"unchecked"})
+    protected RowDataListAssert newAbstractIterableAssert(Iterable<? extends 
RowData> iterable) {
+        if (iterable instanceof List) {
+            return new RowDataListAssert((List<RowData>) iterable);
+        }
+        return new RowDataListAssert(
+                StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList()));
+    }
+
+    public RowDataListAssert asGeneric(DataType dataType) {
+        return asGeneric(dataType.getLogicalType());
+    }
+
+    public RowDataListAssert asGeneric(LogicalType logicalType) {
+        return usingElementComparator(
+                (x, y) -> {
+                    x = InternalDataUtils.toGenericRow(x, logicalType);
+                    y = InternalDataUtils.toGenericRow(y, logicalType);
+                    if (Objects.equals(x, y)) {
+                        return 0;
+                    }
+                    return Objects.hashCode(x) < Objects.hashCode(y) ? -1 : 1;
+                });
+    }
+
+    /** In order to execute this assertion, you need flink-table-runtime in 
the classpath. */
+    public ListAssert<Row> asRows(DataType dataType) {
+        return new ListAssert<>(
+                
this.actual.stream().map(InternalDataUtils.resolveToExternalOrNull(dataType)));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java
index d98d124..ae4b52d 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/StringDataAssert.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.test;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.table.data.StringData;
 
 import org.assertj.core.api.AbstractAssert;
@@ -25,6 +26,7 @@ import org.assertj.core.api.ByteArrayAssert;
 import org.assertj.core.api.StringAssert;
 
 /** Assertions for {@link StringData}. */
+@Experimental
 public class StringDataAssert extends AbstractAssert<StringDataAssert, 
StringData> {
 
     public StringDataAssert(StringData stringData) {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java
index 510a86a..2bf36b6 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/test/TableAssertions.java
@@ -18,26 +18,108 @@
 
 package org.apache.flink.table.test;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.Row;
+
+import org.assertj.core.api.AbstractAssert;
+import org.assertj.core.api.Assertions;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 /** Entrypoint for all the various table assertions. */
+@Experimental
 public class TableAssertions {
 
     private TableAssertions() {}
 
+    // --- External data structures
+
+    public static RowAssert assertThat(Row row) {
+        return new RowAssert(row);
+    }
+
     // --- Internal data structures
 
+    public static ArrayDataAssert assertThat(ArrayData actual) {
+        return new ArrayDataAssert(actual);
+    }
+
+    public static MapDataAssert assertThat(MapData actual) {
+        return new MapDataAssert(actual);
+    }
+
     public static RowDataAssert assertThat(RowData actual) {
         return new RowDataAssert(actual);
     }
 
+    public static RowDataListAssert assertThatRows(Iterator<RowData> actual) {
+        return new RowDataListAssert(
+                StreamSupport.stream(
+                                Spliterators.spliteratorUnknownSize(actual, 
Spliterator.ORDERED),
+                                false)
+                        .collect(Collectors.toList()));
+    }
+
+    public static RowDataListAssert assertThatRows(Iterable<RowData> actual) {
+        if (actual instanceof List) {
+            return new RowDataListAssert((List<RowData>) actual);
+        }
+        return new RowDataListAssert(
+                StreamSupport.stream(actual.spliterator(), 
false).collect(Collectors.toList()));
+    }
+
+    public static RowDataListAssert assertThatRows(Stream<RowData> actual) {
+        return new RowDataListAssert(actual.collect(Collectors.toList()));
+    }
+
+    public static RowDataListAssert assertThatRows(RowData... rows) {
+        return new RowDataListAssert(Arrays.asList(rows));
+    }
+
     public static StringDataAssert assertThat(StringData actual) {
         return new StringDataAssert(actual);
     }
 
+    /**
+     * Create an assertion for internal data, converted to Generic data 
structures supporting
+     * equality assertions. Use this method when the type can be different.
+     *
+     * <p>If you need to assert a data always of the same type, use the other 
assertions like {@link
+     * #assertThat(RowData)} instead.
+     */
+    public static AbstractAssert<?, ?> assertThatGenericDataOfType(
+            Object actual, LogicalType logicalType) {
+        if (actual instanceof ArrayData) {
+            return new ArrayDataAssert((ArrayData) 
actual).asGeneric(logicalType);
+        } else if (actual instanceof MapData) {
+            return new MapDataAssert((MapData) actual).asGeneric(logicalType);
+        } else if (actual instanceof RowData) {
+            return new RowDataAssert((RowData) actual).asGeneric(logicalType);
+        } else if (actual instanceof StringData) {
+            return new StringDataAssert((StringData) actual);
+        }
+
+        return Assertions.assertThatObject(actual);
+    }
+
+    public static AbstractAssert<?, ?> assertThatGenericDataOfType(
+            Object actual, DataType dataType) {
+        return assertThatGenericDataOfType(actual, dataType.getLogicalType());
+    }
+
     // --- Types
 
     public static DataTypeAssert assertThat(DataType actual) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 301ab4c..48d2b75 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -26,12 +26,9 @@ import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.data.binary.BinaryStringDataUtil;
-import org.apache.flink.table.data.conversion.DataStructureConverter;
-import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.data.utils.CastExecutor;
 import org.apache.flink.table.planner.functions.CastFunctionITCase;
 import org.apache.flink.table.types.DataType;
@@ -90,6 +87,7 @@ import static org.apache.flink.table.api.DataTypes.YEAR;
 import static org.apache.flink.table.data.DecimalData.fromBigDecimal;
 import static org.apache.flink.table.data.StringData.fromString;
 import static org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+import static 
org.apache.flink.table.test.TableAssertions.assertThatGenericDataOfType;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -1267,14 +1265,12 @@ class CastRulesTest {
             this.inputTypes.add(srcDataType);
             this.assertionExecutors.add(
                     executor -> {
-                        Object expected = sanitizeTestData(targetType, target);
-
-                        assertThat(sanitizeTestData(targetType, 
executor.cast(src)))
-                                .isEqualTo(expected);
-                        assertThat(sanitizeTestData(targetType, 
executor.cast(src)))
+                        assertThatGenericDataOfType(executor.cast(src), 
targetType)
+                                .isEqualTo(target);
+                        assertThatGenericDataOfType(executor.cast(src), 
targetType)
                                 .as(
                                         "Error when reusing the rule. Perhaps 
there is some state that needs to be reset")
-                                .isEqualTo(expected);
+                                .isEqualTo(target);
                     });
             this.descriptions.add("{" + src + " => " + target + "}");
             this.castContexts.add(castContext);
@@ -1319,17 +1315,5 @@ class CastRulesTest {
             }
             return Arrays.stream(testSpecs);
         }
-
-        // This method makes sure that we can correctly compare rows
-        private Object sanitizeTestData(DataType dataType, Object value) {
-            if (value instanceof RowData) {
-                // Convert to GenericRowData using the DataStructureConverter
-                DataStructureConverter<Object, Object> converter =
-                        DataStructureConverters.getConverter(dataType);
-                converter.open(Thread.currentThread().getContextClassLoader());
-                return 
converter.toInternalOrNull(converter.toExternalOrNull(value));
-            }
-            return value;
-        }
     }
 }
diff --git a/flink-table/flink-table-test-utils/pom.xml 
b/flink-table/flink-table-test-utils/pom.xml
new file mode 100644
index 0000000..28f14ad
--- /dev/null
+++ b/flink-table/flink-table-test-utils/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-table</artifactId>
+               <version>1.15-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-table-test-utils</artifactId>
+       <name>Flink : Table : Test Utils</name>
+       <description>
+               This module contains test utilities for the Table API/SQL 
ecosystem for end users, connectors and formats.
+       </description>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <!-- Core test utils (MiniCluster) -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils-junit</artifactId>
+                       <version>${project.version}</version>
+                       <scope>compile</scope>
+               </dependency>
+
+               <!-- Table dependencies required to create and run a SQL job -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-api-java-bridge</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-runtime</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-planner-loader</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- We bring the assertions from flink-table-common and 
repackage them -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <classifier>tests</classifier>
+                       <type>test-jar</type>
+                       <scope>compile</scope>
+                       <!-- We don't want table-common to be used as 
transitive dependency -->
+                       <optional>true</optional>
+               </dependency>
+
+               <!-- Required for the assertions -->
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>compile</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <!-- Exclude all flink-dist files and 
only include flink-table-* -->
+                                       <execution>
+                                               <id>shade-flink</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <filters>
+                                                               <filter>
+                                                                       <!-- 
Include from table-common test jar the assertions -->
+                                                                       
<artifact>org.apache.flink:flink-table-common:tests</artifact>
+                                                                       
<includes>
+                                                                               
<include>org/apache/flink/table/test/*.java</include>
+                                                                       
</includes>
+                                                               </filter>
+                                                       </filters>
+                                                       <artifactSet>
+                                                               <includes 
combine.children="append">
+                                                                       
<include>org.apache.flink:flink-table-common:tests</include>
+                                                               </includes>
+                                                       </artifactSet>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-table/flink-table-test-utils/src/test/java/TableAssertionTest.java 
b/flink-table/flink-table-test-utils/src/test/java/TableAssertionTest.java
new file mode 100644
index 0000000..05a54b9
--- /dev/null
+++ b/flink-table/flink-table-test-utils/src/test/java/TableAssertionTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.test.TableAssertions;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.Test;
+
+import static org.apache.flink.table.api.DataTypes.ARRAY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.test.TableAssertions.assertThat;
+import static org.apache.flink.table.test.TableAssertions.assertThatRows;
+
+/** Tests for {@link TableAssertions} assertions. */
+class TableAssertionTest {
+
+    @Test
+    void testAssertRowDataWithConversion() {
+        DataType dataType =
+                ROW(
+                        FIELD("a", INT()),
+                        FIELD("b", STRING()),
+                        FIELD("c", ARRAY(BOOLEAN().notNull())));
+
+        GenericRowData genericRowData =
+                GenericRowData.of(
+                        10,
+                        StringData.fromString("my string"),
+                        new GenericArrayData(new boolean[] {true, false}));
+        BinaryRowData binaryRowData =
+                new RowDataSerializer((RowType) dataType.getLogicalType())
+                        .toBinaryRow(genericRowData);
+        Row row = Row.of(10, "my string", new Boolean[] {true, false});
+
+        // Test equality with RowData
+        assertThat(binaryRowData)
+                .asGeneric(dataType)
+                .isEqualTo(genericRowData)
+                .isEqualTo(binaryRowData.copy());
+        assertThatRows(binaryRowData)
+                .asGeneric(dataType)
+                .containsOnly(genericRowData)
+                .containsOnly(binaryRowData);
+
+        // Test equality with Row
+        assertThat(binaryRowData).asRow(dataType).isEqualTo(row);
+        assertThatRows(binaryRowData).asRows(dataType).containsOnly(row);
+    }
+}
diff --git a/flink-table/pom.xml b/flink-table/pom.xml
index b8605d2..b091829 100644
--- a/flink-table/pom.xml
+++ b/flink-table/pom.xml
@@ -47,6 +47,7 @@ under the License.
                <module>flink-sql-parser</module>
                <module>flink-sql-parser-hive</module>
                <module>flink-table-code-splitter</module>
+               <module>flink-table-test-utils</module>
        </modules>
 
        <dependencyManagement>
diff --git a/tools/ci/stage.sh b/tools/ci/stage.sh
index 894977f..e28aae1 100755
--- a/tools/ci/stage.sh
+++ b/tools/ci/stage.sh
@@ -68,7 +68,8 @@ flink-table/flink-sql-client,\
 flink-table/flink-table-planner,\
 flink-table/flink-table-planner-loader,\
 flink-table/flink-table-runtime,\
-flink-table/flink-table-code-splitter"
+flink-table/flink-table-code-splitter,\
+flink-table/flink-table-test-utils"
 
 MODULES_CONNECTORS="\
 flink-contrib/flink-connector-wikiedits,\

Reply via email to