This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f69c9c  Flink: Add conversion from Iceberg types to Flink types 
(#1174)
2f69c9c is described below

commit 2f69c9cb40cca57044d5a939be0cf34fa26eed4f
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 16 04:20:41 2020 +0800

    Flink: Add conversion from Iceberg types to Flink types (#1174)
---
 .../org/apache/iceberg/flink/FlinkSchemaUtil.java  |  55 +++++++++
 .../org/apache/iceberg/flink/TypeToFlinkType.java  | 132 +++++++++++++++++++++
 .../apache/iceberg/flink/TestFlinkSchemaUtil.java  |  70 +++++++++--
 3 files changed, 245 insertions(+), 12 deletions(-)

diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java 
b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
index d4dd61d..90534d7 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
@@ -22,10 +22,29 @@ package org.apache.iceberg.flink;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
 
+/**
+ * Converter between Flink types and Iceberg type.
+ * The conversion is not a 1:1 mapping that not allows back-and-forth 
conversion. So some information might get lost
+ * during the back-and-forth conversion.
+ * <p>
+ * This inconsistent types:
+ * <ul>
+ *   <li>map Iceberg UUID type to Flink BinaryType(16)</li>
+ *   <li>map Flink VarCharType(_) and CharType(_) to Iceberg String type</li>
+ *   <li>map Flink VarBinaryType(_) to Iceberg Binary type</li>
+ *   <li>map Flink TimeType(_) to Iceberg Time type (microseconds)</li>
+ *   <li>map Flink TimestampType(_) to Iceberg Timestamp without zone type 
(microseconds)</li>
+ *   <li>map Flink LocalZonedTimestampType(_) to Iceberg Timestamp with zone 
type (microseconds)</li>
+ *   <li>map Flink MultiSetType to Iceberg Map type(element, int)</li>
+ * </ul>
+ * <p>
+ */
 public class FlinkSchemaUtil {
 
   private FlinkSchemaUtil() {
@@ -43,4 +62,40 @@ public class FlinkSchemaUtil {
 
     return new Schema(converted.asStructType().fields());
   }
+
+  /**
+   * Convert a {@link Schema} to a {@link RowType Flink type}.
+   *
+   * @param schema a Schema
+   * @return the equivalent Flink type
+   * @throws IllegalArgumentException if the type cannot be converted to Flink
+   */
+  public static RowType convert(Schema schema) {
+    return (RowType) TypeUtil.visit(schema, new TypeToFlinkType());
+  }
+
+  /**
+   * Convert a {@link Type} to a {@link LogicalType Flink type}.
+   *
+   * @param type a Type
+   * @return the equivalent Flink type
+   * @throws IllegalArgumentException if the type cannot be converted to Flink
+   */
+  public static LogicalType convert(Type type) {
+    return TypeUtil.visit(type, new TypeToFlinkType());
+  }
+
+  /**
+   * Convert a {@link RowType} to a {@link TableSchema}.
+   *
+   * @param rowType a RowType
+   * @return Flink TableSchema
+   */
+  public static TableSchema toSchema(RowType rowType) {
+    TableSchema.Builder builder = TableSchema.builder();
+    for (RowType.RowField field : rowType.getFields()) {
+      builder.field(field.getName(), 
TypeConversions.fromLogicalToDataType(field.getType()));
+    }
+    return builder.build();
+  }
 }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java 
b/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
new file mode 100644
index 0000000..dfd8ffb
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink;
+
+import java.util.List;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+
+class TypeToFlinkType extends TypeUtil.SchemaVisitor<LogicalType> {
+  TypeToFlinkType() {
+  }
+
+  @Override
+  public LogicalType schema(Schema schema, LogicalType structType) {
+    return structType;
+  }
+
+  @Override
+  public LogicalType struct(Types.StructType struct, List<LogicalType> 
fieldResults) {
+    List<Types.NestedField> fields = struct.fields();
+
+    List<RowType.RowField> flinkFields = 
Lists.newArrayListWithExpectedSize(fieldResults.size());
+    for (int i = 0; i < fields.size(); i += 1) {
+      Types.NestedField field = fields.get(i);
+      LogicalType type = fieldResults.get(i);
+      RowType.RowField flinkField = new RowType.RowField(
+          field.name(), type.copy(field.isOptional()), field.doc());
+      flinkFields.add(flinkField);
+    }
+
+    return new RowType(flinkFields);
+  }
+
+  @Override
+  public LogicalType field(Types.NestedField field, LogicalType fieldResult) {
+    return fieldResult;
+  }
+
+  @Override
+  public LogicalType list(Types.ListType list, LogicalType elementResult) {
+    return new ArrayType(elementResult.copy(list.isElementOptional()));
+  }
+
+  @Override
+  public LogicalType map(Types.MapType map, LogicalType keyResult, LogicalType 
valueResult) {
+    // keys in map are not allowed to be null.
+    return new MapType(keyResult.copy(false), 
valueResult.copy(map.isValueOptional()));
+  }
+
+  @Override
+  public LogicalType primitive(Type.PrimitiveType primitive) {
+    switch (primitive.typeId()) {
+      case BOOLEAN:
+        return new BooleanType();
+      case INTEGER:
+        return new IntType();
+      case LONG:
+        return new BigIntType();
+      case FLOAT:
+        return new FloatType();
+      case DOUBLE:
+        return new DoubleType();
+      case DATE:
+        return new DateType();
+      case TIME:
+        // MICROS
+        return new TimeType(6);
+      case TIMESTAMP:
+        Types.TimestampType timestamp = (Types.TimestampType) primitive;
+        if (timestamp.shouldAdjustToUTC()) {
+          // MICROS
+          return new LocalZonedTimestampType(6);
+        } else {
+          // MICROS
+          return new TimestampType(6);
+        }
+      case STRING:
+        return new VarCharType(VarCharType.MAX_LENGTH);
+      case UUID:
+        // UUID length is 16
+        return new BinaryType(16);
+      case FIXED:
+        Types.FixedType fixedType = (Types.FixedType) primitive;
+        return new BinaryType(fixedType.length());
+      case BINARY:
+        return new VarBinaryType(VarBinaryType.MAX_LENGTH);
+      case DECIMAL:
+        Types.DecimalType decimal = (Types.DecimalType) primitive;
+        return new DecimalType(decimal.precision(), decimal.scale());
+      default:
+        throw new UnsupportedOperationException(
+            "Cannot convert unknown type to Flink: " + primitive);
+    }
+  }
+}
diff --git 
a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java 
b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
index c7c00c1..320d3bb 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java
@@ -21,7 +21,17 @@ package org.apache.iceberg.flink;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
@@ -57,8 +67,7 @@ public class TestFlinkSchemaUtil {
         .field("multiset", DataTypes.MULTISET(DataTypes.STRING().notNull()))
         .build();
 
-    Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
-    Schema expectedSchema = new Schema(
+    Schema icebergSchema = new Schema(
         Types.NestedField.required(0, "id", Types.IntegerType.get(), null),
         Types.NestedField.optional(1, "name", Types.StringType.get(), null),
         Types.NestedField.required(2, "salary", Types.DoubleType.get(), null),
@@ -90,7 +99,7 @@ public class TestFlinkSchemaUtil {
             Types.IntegerType.get()))
     );
 
-    Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
+    checkSchema(flinkSchema, icebergSchema);
   }
 
   @Test
@@ -112,8 +121,7 @@ public class TestFlinkSchemaUtil {
         )
         .build();
 
-    Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
-    Schema expectedSchema = new Schema(
+    Schema icebergSchema = new Schema(
         Types.NestedField.required(0, "map_int_long",
             Types.MapType.ofOptional(4, 5, Types.IntegerType.get(), 
Types.LongType.get()), null),
         Types.NestedField.optional(1, "map_int_array_string",
@@ -132,7 +140,7 @@ public class TestFlinkSchemaUtil {
         )
     );
 
-    Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
+    checkSchema(flinkSchema, icebergSchema);
   }
 
   @Test
@@ -152,8 +160,7 @@ public class TestFlinkSchemaUtil {
         ).nullable()) /* Optional */
         .build();
 
-    Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
-    Schema expectedSchema = new Schema(
+    Schema icebergSchema = new Schema(
         Types.NestedField.required(0, "struct_int_string_decimal",
             Types.StructType.of(
                 Types.NestedField.optional(5, "field_int", 
Types.IntegerType.get()),
@@ -173,7 +180,8 @@ public class TestFlinkSchemaUtil {
             )
         )
     );
-    Assert.assertEquals(actualSchema.asStruct(), expectedSchema.asStruct());
+
+    checkSchema(flinkSchema, icebergSchema);
   }
 
   @Test
@@ -201,8 +209,7 @@ public class TestFlinkSchemaUtil {
         ).notNull()) /* Required */
         .build();
 
-    Schema actualSchema = FlinkSchemaUtil.convert(flinkSchema);
-    Schema expectedSchema = new Schema(
+    Schema icebergSchema = new Schema(
         Types.NestedField.required(0, "list_struct_fields",
             Types.ListType.ofOptional(4, Types.StructType.of(
                 Types.NestedField.optional(3, "field_int", 
Types.IntegerType.get())
@@ -222,6 +229,45 @@ public class TestFlinkSchemaUtil {
             ))
     );
 
-    Assert.assertEquals(expectedSchema.asStruct(), actualSchema.asStruct());
+    checkSchema(flinkSchema, icebergSchema);
+  }
+
+  private void checkSchema(TableSchema flinkSchema, Schema icebergSchema) {
+    Assert.assertEquals(icebergSchema.asStruct(), 
FlinkSchemaUtil.convert(flinkSchema).asStruct());
+    // The conversion is not a 1:1 mapping, so we just check iceberg types.
+    Assert.assertEquals(
+        icebergSchema.asStruct(),
+        
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema))).asStruct());
+  }
+
+  @Test
+  public void testInconsistentTypes() {
+    checkInconsistentType(
+        Types.UUIDType.get(), new BinaryType(16),
+        new BinaryType(16), Types.FixedType.ofLength(16));
+    checkInconsistentType(
+        Types.StringType.get(), new VarCharType(VarCharType.MAX_LENGTH),
+        new CharType(100), Types.StringType.get());
+    checkInconsistentType(
+        Types.BinaryType.get(), new VarBinaryType(VarBinaryType.MAX_LENGTH),
+        new VarBinaryType(100), Types.BinaryType.get());
+    checkInconsistentType(
+        Types.TimeType.get(), new TimeType(6),
+        new TimeType(3), Types.TimeType.get());
+    checkInconsistentType(
+        Types.TimestampType.withoutZone(), new TimestampType(6),
+        new TimestampType(3), Types.TimestampType.withoutZone());
+    checkInconsistentType(
+        Types.TimestampType.withZone(), new LocalZonedTimestampType(6),
+        new LocalZonedTimestampType(3), Types.TimestampType.withZone());
+  }
+
+  private void checkInconsistentType(
+      Type icebergType, LogicalType flinkExpectedType,
+      LogicalType flinkType, Type icebergExpectedType) {
+    Assert.assertEquals(flinkExpectedType, 
FlinkSchemaUtil.convert(icebergType));
+    Assert.assertEquals(
+        Types.StructType.of(Types.NestedField.optional(0, "f0", 
icebergExpectedType)),
+        
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(RowType.of(flinkType))).asStruct());
   }
 }

Reply via email to