This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6070817 [FLINK-11702][table-planner-blink] Introduce a new blink table type system: InternalType. 6070817 is described below commit 607081782471e0989432c1fce3c592f4d79a3871 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Mon Feb 25 16:15:48 2019 +0800 [FLINK-11702][table-planner-blink] Introduce a new blink table type system: InternalType. This closes #7817. --- .../org/apache/flink/table/type/ArrayType.java | 61 +++++++++++ .../org/apache/flink/table/type/AtomicType.java | 25 +++++ .../org/apache/flink/table/type/BooleanType.java | 29 +++++ .../java/org/apache/flink/table/type/ByteType.java | 29 +++++ .../java/org/apache/flink/table/type/CharType.java | 29 +++++ .../java/org/apache/flink/table/type/DateType.java | 44 ++++++++ .../org/apache/flink/table/type/DecimalType.java | 105 ++++++++++++++++++ .../org/apache/flink/table/type/DoubleType.java | 29 +++++ .../org/apache/flink/table/type/FloatType.java | 29 +++++ .../org/apache/flink/table/type/GenericType.java | 83 +++++++++++++++ .../java/org/apache/flink/table/type/IntType.java | 29 +++++ .../org/apache/flink/table/type/InternalType.java | 38 +++++++ .../org/apache/flink/table/type/InternalTypes.java | 84 +++++++++++++++ .../java/org/apache/flink/table/type/LongType.java | 29 +++++ .../java/org/apache/flink/table/type/MapType.java | 77 ++++++++++++++ .../org/apache/flink/table/type/PrimitiveType.java | 41 ++++++++ .../java/org/apache/flink/table/type/RowType.java | 109 +++++++++++++++++++ .../org/apache/flink/table/type/ShortType.java | 29 +++++ .../org/apache/flink/table/type/StringType.java | 45 ++++++++ .../java/org/apache/flink/table/type/TimeType.java | 45 ++++++++ .../org/apache/flink/table/type/TimestampType.java | 44 ++++++++ .../apache/flink/table/type/TypeConverters.java | 117 +++++++++++++++++++++ .../apache/flink/table/type/InternalTypeTest.java | 112 ++++++++++++++++++++ 23 files changed, 1262 insertions(+) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java new file mode 100644 index 0000000..f825b22 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ArrayType.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import org.apache.flink.util.Preconditions; + +/** + * Type for Array. + */ +public class ArrayType implements InternalType { + + private final InternalType elementType; + + public ArrayType(InternalType elementType) { + this.elementType = Preconditions.checkNotNull(elementType); + } + + public InternalType getElementType() { + return elementType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ArrayType arrayType = (ArrayType) o; + + return elementType.equals(arrayType.elementType); + } + + @Override + public int hashCode() { + return elementType.hashCode(); + } + + @Override + public String toString() { + return "ArrayType{elementType=" + elementType + '}'; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java new file mode 100644 index 0000000..d499f54 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/AtomicType.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Atomic Type. + */ +public interface AtomicType extends InternalType { +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java new file mode 100644 index 0000000..02416ed --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/BooleanType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Boolean type. + */ +public class BooleanType extends PrimitiveType { + + public static final BooleanType INSTANCE = new BooleanType(); + + private BooleanType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java new file mode 100644 index 0000000..343acb1 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ByteType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Byte type. + */ +public class ByteType extends PrimitiveType { + + public static final ByteType INSTANCE = new ByteType(); + + private ByteType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java new file mode 100644 index 0000000..bdb523c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/CharType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Char type. + */ +public class CharType extends PrimitiveType { + + public static final CharType INSTANCE = new CharType(); + + private CharType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java new file mode 100644 index 0000000..45588a6 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DateType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Sql date type. + */ +public class DateType implements AtomicType { + + public static final DateType INSTANCE = new DateType(); + + private DateType() {} + + @Override + public boolean equals(Object o) { + return this == o || o != null && getClass() == o.getClass(); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java new file mode 100644 index 0000000..b47534f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DecimalType.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import java.math.BigDecimal; + +import static java.lang.String.format; + +/** + * Decimal type. + */ +public class DecimalType implements AtomicType { + + public static final int MAX_PRECISION = 38; + + private final int precision; + private final int scale; + + // Default to be same with Integer. + public static final DecimalType USER_DEFAULT = new DecimalType(10, 0); + + // Mainly used for implicitly type cast and test. + public static final DecimalType SYSTEM_DEFAULT = new DecimalType(MAX_PRECISION, 18); + + public DecimalType(int precision, int scale) { + if (precision < 0) { + throw new IllegalArgumentException(format("Decimal precision (%s) cannot be negative.", + precision)); + } + + if (scale > precision) { + throw new IllegalArgumentException(format("Decimal scale (%s) cannot be greater than " + + "precision (%s).", scale, precision)); + } + + if (precision > MAX_PRECISION) { + throw new IllegalArgumentException( + "DecimalType can only support precision up to " + MAX_PRECISION); + } + + this.precision = precision; + this.scale = scale; + } + + public int precision() { + return this.precision; + } + + public int scale() { + return this.scale; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DecimalType that = (DecimalType) o; + + return precision == that.precision && scale == that.scale; + } + + @Override + public int hashCode() { + int result = precision; + result = 31 * result + scale; + return result; + } + + @Override + public String toString() { + return "DecimalType{" + + "precision=" + precision + + ", scale=" + scale + + '}'; + } + + public static DecimalType of(int precision, int scale) { + return new DecimalType(precision, scale); + } + + public static DecimalType of(BigDecimal value) { + return of(value.precision(), value.scale()); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java new file mode 100644 index 0000000..fb1e46b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/DoubleType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Double type. + */ +public class DoubleType extends PrimitiveType { + + public static final DoubleType INSTANCE = new DoubleType(); + + private DoubleType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java new file mode 100644 index 0000000..a250f161 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/FloatType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Float type. + */ +public class FloatType extends PrimitiveType { + + public static final FloatType INSTANCE = new FloatType(); + + private FloatType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java new file mode 100644 index 0000000..828b935 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/GenericType.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Generic type. + */ +public class GenericType<T> implements AtomicType { + + private final TypeInformation<T> typeInfo; + private TypeSerializer<T> serializer; + + public GenericType(Class<T> typeClass) { + this(new GenericTypeInfo<>(typeClass)); + } + + public GenericType(TypeInformation<T> typeInfo) { + this.typeInfo = checkNotNull(typeInfo); + } + + public TypeInformation<T> getTypeInfo() { + return typeInfo; + } + + public Class<T> getTypeClass() { + return typeInfo.getTypeClass(); + } + + public TypeSerializer<T> getSerializer() { + if (serializer == null) { + this.serializer = typeInfo.createSerializer(new ExecutionConfig()); + } + return serializer; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + GenericType<?> that = (GenericType<?>) o; + return typeInfo.equals(that.typeInfo); + } + + @Override + public int hashCode() { + return typeInfo.hashCode(); + } + + @Override + public String toString() { + return "GenericType{" + + "typeClass=" + typeInfo.getTypeClass() + + '}'; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java new file mode 100644 index 0000000..e89032b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/IntType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Int type. + */ +public class IntType extends PrimitiveType { + + public static final IntType INSTANCE = new IntType(); + + private IntType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java new file mode 100644 index 0000000..452f2ee --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalType.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import java.io.Serializable; + +/** + * InternalType is the base type of all Flink SQL types and it is for internal computing + * and code generator. + * + * <p>An InternalType may correspond to multiple data formats. The user uses the basic + * Java data format, while we use a more efficient binary format inside Table for + * performance, such as StringType corresponds to BinaryString internally, and JDK + * String corresponds to the user layer. + * So an Internal Type may correspond to multiple TypeSerializers. + * + * <p>We only support limited data formats, because all other data formats are converted + * into data formats that we support internally when executed inside the table. Convert + * it to the user when he needs it (For example, UDF and so on). + */ +public interface InternalType extends Serializable { +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java new file mode 100644 index 0000000..f3429b0 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * Accessor of {@link InternalType}s. + */ +public class InternalTypes { + + public static final StringType STRING = StringType.INSTANCE; + + public static final BooleanType BOOLEAN = BooleanType.INSTANCE; + + public static final DoubleType DOUBLE = DoubleType.INSTANCE; + + public static final FloatType FLOAT = FloatType.INSTANCE; + + public static final ByteType BYTE = ByteType.INSTANCE; + + public static final IntType INT = IntType.INSTANCE; + + public static final LongType LONG = LongType.INSTANCE; + + public static final ShortType SHORT = ShortType.INSTANCE; + + public static final CharType CHAR = CharType.INSTANCE; + + public static final ArrayType BINARY = new ArrayType(BYTE); + + public static final DateType DATE = DateType.INSTANCE; + + public static final TimestampType TIMESTAMP = TimestampType.INSTANCE; + + public static final TimeType TIME = TimeType.INSTANCE; + + public static final DecimalType SYSTEM_DEFAULT_DECIMAL = DecimalType.SYSTEM_DEFAULT; + + public static ArrayType createArrayType(InternalType elementType) { + return new ArrayType(elementType); + } + + public static DecimalType createDecimalType(int precision, int scale) { + return new DecimalType(precision, scale); + } + + public static MapType createMapType(InternalType keyType, InternalType valueType) { + return new MapType(keyType, valueType); + } + + public static <T> GenericType<T> createGenericType(Class<T> cls) { + return new GenericType<>(cls); + } + + public static <T> GenericType<T> createGenericType(TypeInformation<T> typeInfo) { + return new GenericType<>(typeInfo); + } + + public static RowType createRowType(InternalType[] types, String[] fieldNames) { + return new RowType(types, fieldNames); + } + + public static RowType createRowType(InternalType... types) { + return new RowType(types); + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java new file mode 100644 index 0000000..c21936f --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/LongType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Long type. + */ +public class LongType extends PrimitiveType { + + public static final LongType INSTANCE = new LongType(); + + private LongType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java new file mode 100644 index 0000000..4498d5d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/MapType.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Map type. + */ +public class MapType implements InternalType { + + private final InternalType keyType; + private final InternalType valueType; + + public MapType(InternalType keyType, InternalType valueType) { + if (keyType == null) { + throw new IllegalArgumentException("keyType should not be null."); + } + if (valueType == null) { + throw new IllegalArgumentException("valueType should not be null."); + } + this.keyType = keyType; + this.valueType = valueType; + } + + public InternalType getKeyType() { + return keyType; + } + + public InternalType getValueType() { + return valueType; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MapType mapType = (MapType) o; + + return getKeyType().equals(mapType.getKeyType()) && + getValueType().equals(mapType.getValueType()); + } + + @Override + public int hashCode() { + int result = getKeyType().hashCode(); + result = 31 * result + getValueType().hashCode(); + return result; + } + + @Override + public String toString() { + return "MapType{" + + "keyType=" + keyType + + ", valueType=" + valueType + + '}'; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java new file mode 100644 index 0000000..002f4e0 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Primitive type. + */ +public abstract class PrimitiveType implements AtomicType { + + @Override + public boolean equals(Object o) { + return this == o || o != null && getClass() == o.getClass(); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java new file mode 100644 index 0000000..6c02996 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/RowType.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import org.apache.flink.types.Row; + +import java.util.Arrays; + +/** + * Row type for row. + * + * <p>It's internal data structure is BaseRow, and it's external data structure is {@link Row}. + */ +public class RowType implements InternalType { + + private final InternalType[] types; + + private final String[] fieldNames; + + public RowType(InternalType... types) { + this(types, generateDefaultFieldNames(types.length)); + } + + public RowType(InternalType[] types, String[] fieldNames) { + this.types = types; + this.fieldNames = fieldNames; + if (types.length != fieldNames.length) { + throw new IllegalArgumentException("Types should be the same length as names, types is: " + + Arrays.toString(types) + ", and the names: " + Arrays.toString(fieldNames)); + } + } + + private static String[] generateDefaultFieldNames(int length) { + String[] fieldNames = new String[length]; + for (int i = 0; i < length; i++) { + fieldNames[i] = "f" + i; + } + return fieldNames; + } + + public int getArity() { + return types.length; + } + + public InternalType[] getFieldTypes() { + return types; + } + + public InternalType getTypeAt(int i) { + return types[i]; + } + + public String[] getFieldNames() { + return fieldNames; + } + + public int getFieldIndex(String fieldName) { + for (int i = 0; i < fieldNames.length; i++) { + if (fieldNames[i].equals(fieldName)) { + return i; + } + } + return -1; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + RowType that = (RowType) o; + + // RowType comparisons should not compare names and are compatible with the behavior of CompositeTypeInfo. + return Arrays.equals(getFieldTypes(), that.getFieldTypes()); + } + + @Override + public int hashCode() { + return Arrays.hashCode(types); + } + + @Override + public String toString() { + return "RowType{" + + "types=" + Arrays.toString(types) + + ", fieldNames=" + Arrays.toString(fieldNames) + + '}'; + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java new file mode 100644 index 0000000..32d242d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/ShortType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * short type. + */ +public class ShortType extends PrimitiveType { + + public static final ShortType INSTANCE = new ShortType(); + + private ShortType() {} +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java new file mode 100644 index 0000000..8893dc2 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/StringType.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * String type. + */ +public class StringType implements AtomicType { + + public static final StringType INSTANCE = new StringType(); + + private StringType() { + } + + @Override + public boolean equals(Object o) { + return this == o || o != null && getClass() == o.getClass(); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java new file mode 100644 index 0000000..b3cdf5b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimeType.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Sql time type. + */ +public class TimeType implements AtomicType { + + public static final TimeType INSTANCE = new TimeType(); + + private TimeType() {} + + @Override + public boolean equals(Object o) { + return this == o || o != null && getClass() == o.getClass(); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java new file mode 100644 index 0000000..12aa689 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TimestampType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +/** + * Sql timestamp type. + */ +public class TimestampType implements AtomicType { + + public static final TimestampType INSTANCE = new TimestampType(); + + private TimestampType() {} + + @Override + public boolean equals(Object o) { + return this == o || o != null && getClass() == o.getClass(); + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java new file mode 100644 index 0000000..5bcf99d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * Converters of {@link InternalType} and {@link TypeInformation}. + */ +public class TypeConverters { + + public static final Map<TypeInformation, InternalType> TYPE_INFO_TO_INTERNAL_TYPE; + static { + Map<TypeInformation, InternalType> tiToType = new HashMap<>(); + tiToType.put(BasicTypeInfo.STRING_TYPE_INFO, InternalTypes.STRING); + tiToType.put(BasicTypeInfo.BOOLEAN_TYPE_INFO, InternalTypes.BOOLEAN); + tiToType.put(BasicTypeInfo.DOUBLE_TYPE_INFO, InternalTypes.DOUBLE); + tiToType.put(BasicTypeInfo.FLOAT_TYPE_INFO, InternalTypes.FLOAT); + tiToType.put(BasicTypeInfo.BYTE_TYPE_INFO, InternalTypes.BYTE); + tiToType.put(BasicTypeInfo.INT_TYPE_INFO, InternalTypes.INT); + tiToType.put(BasicTypeInfo.LONG_TYPE_INFO, InternalTypes.LONG); + tiToType.put(BasicTypeInfo.SHORT_TYPE_INFO, InternalTypes.SHORT); + tiToType.put(BasicTypeInfo.CHAR_TYPE_INFO, InternalTypes.CHAR); + tiToType.put(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, InternalTypes.BINARY); + tiToType.put(SqlTimeTypeInfo.DATE, InternalTypes.DATE); + tiToType.put(SqlTimeTypeInfo.TIMESTAMP, InternalTypes.TIMESTAMP); + tiToType.put(SqlTimeTypeInfo.TIME, InternalTypes.TIME); + + // BigDecimal have infinity precision and scale, but we converted it into a limited + // Decimal(38, 18). If the user's BigDecimal is more precision than this, we will + // throw Exception to remind user to use GenericType in real data conversion. + tiToType.put(BasicTypeInfo.BIG_DEC_TYPE_INFO, InternalTypes.SYSTEM_DEFAULT_DECIMAL); + TYPE_INFO_TO_INTERNAL_TYPE = Collections.unmodifiableMap(tiToType); + } + + /** + * Create a {@link InternalType} from a {@link TypeInformation}. + * + * <p>Note: Information may be lost. For example, after Pojo is converted to InternalType, + * we no longer know that it is a Pojo and only think it is a Row. + * + * <p>Eg: + * {@link BasicTypeInfo#STRING_TYPE_INFO} => {@link InternalTypes#STRING}. + * {@link BasicTypeInfo#BIG_DEC_TYPE_INFO} => {@link DecimalType}. + * {@link RowTypeInfo} => {@link RowType}. + * {@link PojoTypeInfo} (CompositeType) => {@link RowType}. + * {@link TupleTypeInfo} (CompositeType) => {@link RowType}. + */ + public static InternalType createInternalTypeFromTypeInfo(TypeInformation typeInfo) { + InternalType type = TYPE_INFO_TO_INTERNAL_TYPE.get(typeInfo); + if (type != null) { + return type; + } + + if (typeInfo instanceof CompositeType) { + CompositeType compositeType = (CompositeType) typeInfo; + return InternalTypes.createRowType( + Stream.iterate(0, x -> x + 1).limit(compositeType.getArity()) + .map((Function<Integer, TypeInformation>) compositeType::getTypeAt) + .map(TypeConverters::createInternalTypeFromTypeInfo) + .toArray(InternalType[]::new), + compositeType.getFieldNames() + ); + } else if (typeInfo instanceof PrimitiveArrayTypeInfo) { + PrimitiveArrayTypeInfo arrayType = (PrimitiveArrayTypeInfo) typeInfo; + return InternalTypes.createArrayType( + createInternalTypeFromTypeInfo(arrayType.getComponentType())); + } else if (typeInfo instanceof BasicArrayTypeInfo) { + BasicArrayTypeInfo arrayType = (BasicArrayTypeInfo) typeInfo; + return InternalTypes.createArrayType( + createInternalTypeFromTypeInfo(arrayType.getComponentInfo())); + } else if (typeInfo instanceof ObjectArrayTypeInfo) { + ObjectArrayTypeInfo arrayType = (ObjectArrayTypeInfo) typeInfo; + return InternalTypes.createArrayType( + createInternalTypeFromTypeInfo(arrayType.getComponentInfo())); + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapType = (MapTypeInfo) typeInfo; + return InternalTypes.createMapType( + createInternalTypeFromTypeInfo(mapType.getKeyTypeInfo()), + createInternalTypeFromTypeInfo(mapType.getValueTypeInfo())); + } else { + return InternalTypes.createGenericType(typeInfo); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java new file mode 100644 index 0000000..be2d148 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/type/InternalTypeTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.type; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.util.InstantiationUtil; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.flink.table.type.TypeConverters.createInternalTypeFromTypeInfo; + +/** + * Test for {@link InternalType}s. + */ +public class InternalTypeTest { + + @Test + public void testHashCodeAndEquals() throws IOException, ClassNotFoundException { + testHashCodeAndEquals(InternalTypes.INT); + testHashCodeAndEquals(InternalTypes.DATE); + testHashCodeAndEquals(InternalTypes.TIME); + testHashCodeAndEquals(InternalTypes.TIMESTAMP); + testHashCodeAndEquals(InternalTypes.BINARY); + testHashCodeAndEquals(InternalTypes.STRING); + testHashCodeAndEquals(new DecimalType(15, 5)); + testHashCodeAndEquals(new GenericType<>(InternalTypeTest.class)); + testHashCodeAndEquals(new RowType(InternalTypes.STRING, InternalTypes.INT, InternalTypes.INT)); + testHashCodeAndEquals(new ArrayType(InternalTypes.INT)); + testHashCodeAndEquals(new ArrayType(InternalTypes.STRING)); + testHashCodeAndEquals(new MapType(InternalTypes.STRING, InternalTypes.INT)); + } + + private void testHashCodeAndEquals(InternalType type) throws IOException, ClassNotFoundException { + InternalType newType = InstantiationUtil.deserializeObject + (InstantiationUtil.serializeObject(type), + Thread.currentThread().getContextClassLoader()); + + Assert.assertEquals(type.hashCode(), newType.hashCode()); + Assert.assertEquals(type, newType); + + // need override toString. + Assert.assertFalse(newType.toString().contains("@")); + } + + @Test + public void testConverter() throws IOException, ClassNotFoundException { + testConvertToRowType(new RowTypeInfo( + new TypeInformation[] {Types.INT, Types.STRING}, + new String[] {"field1", "field2"})); + testConvertToRowType((CompositeType) TypeInformation.of(MyPojo.class)); + + testConvertCompare(PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, + new ArrayType(InternalTypes.DOUBLE)); + testConvertCompare(BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO, new ArrayType(InternalTypes.DOUBLE)); + testConvertCompare(ObjectArrayTypeInfo.getInfoFor(TypeInformation.of(MyPojo.class)), + new ArrayType(createInternalTypeFromTypeInfo(TypeInformation.of(MyPojo.class)))); + testConvertCompare(new MapTypeInfo<>(Types.INT, Types.STRING), + new MapType(InternalTypes.INT, InternalTypes.STRING)); + testConvertCompare(new GenericTypeInfo<>(MyPojo.class), new GenericType<>(MyPojo.class)); + } + + private void testConvertToRowType(CompositeType typeInfo) { + RowType rowType = (RowType) createInternalTypeFromTypeInfo(typeInfo); + Assert.assertArrayEquals( + new InternalType[] {InternalTypes.INT, InternalTypes.STRING}, + rowType.getFieldTypes()); + Assert.assertArrayEquals( + new String[] {"field1", "field2"}, + rowType.getFieldNames()); + } + + private void testConvertCompare(TypeInformation typeInfo, InternalType internalType) { + InternalType converted = createInternalTypeFromTypeInfo(typeInfo); + Assert.assertEquals(internalType, converted); + Assert.assertEquals(internalType.hashCode(), converted.hashCode()); + } + + /** + * Test pojo. + */ + public static class MyPojo { + public int field1; + public String field2; + } +}