This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a71c200b470ffe7ae614870d5855d1e24673c08b
Author: Timo Walther <twal...@apache.org>
AuthorDate: Tue May 21 13:35:46 2019 +0200

    [hotfix][table-common] Fix equality of data types with same conversion class
---
 .../flink/table/types/CollectionDataType.java      | 26 +++++++++++++---------
 .../org/apache/flink/table/types/DataType.java     | 20 +++++++++++------
 .../org/apache/flink/table/types/DataTypeTest.java |  6 +++++
 3 files changed, 34 insertions(+), 18 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 17b7096..b50fea9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -43,7 +43,7 @@ public final class CollectionDataType extends DataType {
                        LogicalType logicalType,
                        @Nullable Class<?> conversionClass,
                        DataType elementDataType) {
-               super(logicalType, conversionClass);
+               super(logicalType, ensureArrayConversionClass(logicalType, 
elementDataType, conversionClass));
                this.elementDataType = 
Preconditions.checkNotNull(elementDataType, "Element data type must not be 
null.");
        }
 
@@ -82,16 +82,6 @@ public final class CollectionDataType extends DataType {
        }
 
        @Override
-       public Class<?> getConversionClass() {
-               // arrays are a special case because their default conversion 
class depends on the
-               // conversion class of the element type
-               if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && 
conversionClass == null) {
-                       return 
Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
-               }
-               return super.getConversionClass();
-       }
-
-       @Override
        public <R> R accept(DataTypeVisitor<R> visitor) {
                return visitor.visit(this);
        }
@@ -115,4 +105,18 @@ public final class CollectionDataType extends DataType {
        public int hashCode() {
                return Objects.hash(super.hashCode(), elementDataType);
        }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static Class<?> ensureArrayConversionClass(
+                       LogicalType logicalType,
+                       DataType elementDataType,
+                       @Nullable Class<?> clazz) {
+               // arrays are a special case because their default conversion 
class depends on the
+               // conversion class of the element type
+               if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz 
== null) {
+                       return 
Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
+               }
+               return clazz;
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
index 303a052..6b783e44 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -52,13 +52,15 @@ import java.util.Objects;
 @PublicEvolving
 public abstract class DataType implements Serializable {
 
-       protected LogicalType logicalType;
+       protected final LogicalType logicalType;
 
-       protected @Nullable Class<?> conversionClass;
+       protected final Class<?> conversionClass;
 
        DataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
                this.logicalType = Preconditions.checkNotNull(logicalType, 
"Logical type must not be null.");
-               this.conversionClass = performEarlyClassValidation(logicalType, 
conversionClass);
+               this.conversionClass = performEarlyClassValidation(
+                       logicalType,
+                       ensureConversionClass(logicalType, conversionClass));
        }
 
        /**
@@ -79,9 +81,6 @@ public abstract class DataType implements Serializable {
         * @return the expected conversion class
         */
        public Class<?> getConversionClass() {
-               if (conversionClass == null) {
-                       return logicalType.getDefaultConversion();
-               }
                return conversionClass;
        }
 
@@ -133,7 +132,7 @@ public abstract class DataType implements Serializable {
                }
                DataType dataType = (DataType) o;
                return logicalType.equals(dataType.logicalType) &&
-                       Objects.equals(conversionClass, 
dataType.conversionClass);
+                       conversionClass.equals(dataType.conversionClass);
        }
 
        @Override
@@ -162,4 +161,11 @@ public abstract class DataType implements Serializable {
                }
                return candidate;
        }
+
+       private static Class<?> ensureConversionClass(LogicalType logicalType, 
@Nullable Class<?> clazz) {
+               if (clazz == null) {
+                       return logicalType.getDefaultConversion();
+               }
+               return clazz;
+       }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index 2475db8..88b5cfd 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.types;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 
 import org.junit.Test;
@@ -128,4 +129,9 @@ public class DataTypeTest {
        public void testInvalidOrderInterval() {
                INTERVAL(MONTH(), YEAR(2));
        }
+
+       @Test
+       public void testConversionEquality() {
+               assertEquals(DataTypes.VARCHAR(2).bridgedTo(String.class), 
DataTypes.VARCHAR(2));
+       }
 }

Reply via email to