This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cecbe3ce295d44e8c712e68385f75cda339fff79
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu Aug 1 16:41:41 2019 +0200

    [hotfix][table-common] Add utilities for modifying (nested) types
---
 .../table/client/gateway/local/LocalExecutor.java  |  17 +--
 .../types/logical/utils/LogicalTypeDuplicator.java | 133 +++++++++++++++++
 .../types/logical/utils/LogicalTypeUtils.java      |  68 +++++++++
 .../flink/table/types/utils/DataTypeUtils.java     |  42 ++++++
 .../table/types/LogicalTypeDuplicatorTest.java     | 161 +++++++++++++++++++++
 5 files changed, 411 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 5d91964..a7fd6b1 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -19,8 +19,6 @@
 package org.apache.flink.table.client.gateway.local;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
@@ -41,7 +39,6 @@ import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.gateway.Executor;
@@ -54,6 +51,9 @@ import 
org.apache.flink.table.client.gateway.local.result.BasicResult;
 import org.apache.flink.table.client.gateway.local.result.ChangelogResult;
 import org.apache.flink.table.client.gateway.local.result.DynamicResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.StringUtils;
 
@@ -611,13 +611,10 @@ public class LocalExecutor implements Executor {
        private static TableSchema removeTimeAttributes(TableSchema schema) {
                final TableSchema.Builder builder = TableSchema.builder();
                for (int i = 0; i < schema.getFieldCount(); i++) {
-                       final TypeInformation<?> type = 
schema.getFieldTypes()[i];
-                       final TypeInformation<?> convertedType;
-                       if (FlinkTypeFactory.isTimeIndicatorType(type)) {
-                               convertedType = Types.SQL_TIMESTAMP;
-                       } else {
-                               convertedType = type;
-                       }
+                       final DataType dataType = schema.getFieldDataTypes()[i];
+                       final DataType convertedType = 
DataTypeUtils.replaceLogicalType(
+                               dataType,
+                               
LogicalTypeUtils.removeTimeAttributes(dataType.getLogicalType()));
                        builder.field(schema.getFieldNames()[i], convertedType);
                }
                return builder.build();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
new file mode 100644
index 0000000..e417530
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeDuplicator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.RowType.RowField;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.StructuredType.StructuredAttribute;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Returns a deep copy of a {@link LogicalType}.
+ *
+ * <p>It also enables replacing children of possibly nested structures by 
overwriting corresponding
+ * {@code visit()} methods.
+ */
+@Internal
+public class LogicalTypeDuplicator extends 
LogicalTypeDefaultVisitor<LogicalType> {
+
+       @Override
+       public LogicalType visit(ArrayType arrayType) {
+               return new ArrayType(
+                       arrayType.isNullable(),
+                       arrayType.getElementType().accept(this));
+       }
+
+       @Override
+       public LogicalType visit(MultisetType multisetType) {
+               return new MultisetType(
+                       multisetType.isNullable(),
+                       multisetType.getElementType().accept(this));
+       }
+
+       @Override
+       public LogicalType visit(MapType mapType) {
+               return new MapType(
+                       mapType.isNullable(),
+                       mapType.getKeyType().accept(this),
+                       mapType.getValueType().accept(this));
+       }
+
+       @Override
+       public LogicalType visit(RowType rowType) {
+               final List<RowField> fields = rowType.getFields().stream()
+                       .map(f -> {
+                               if (f.getDescription().isPresent()) {
+                                       return new RowField(
+                                               f.getName(),
+                                               f.getType().accept(this),
+                                               f.getDescription().get());
+                               }
+                               return new RowField(f.getName(), 
f.getType().accept(this));
+                       })
+                       .collect(Collectors.toList());
+
+               return new RowType(
+                       rowType.isNullable(),
+                       fields);
+       }
+
+       @Override
+       public LogicalType visit(DistinctType distinctType) {
+               final DistinctType.Builder builder = new DistinctType.Builder(
+                       distinctType.getObjectIdentifier(),
+                       distinctType.getSourceType().accept(this));
+               
distinctType.getDescription().ifPresent(builder::setDescription);
+               return builder.build();
+       }
+
+       @Override
+       public LogicalType visit(StructuredType structuredType) {
+               final List<StructuredAttribute> attributes = 
structuredType.getAttributes().stream()
+                       .map(a -> {
+                               if (a.getDescription().isPresent()) {
+                                       return new StructuredAttribute(
+                                               a.getName(),
+                                               a.getType().accept(this),
+                                               a.getDescription().get());
+                               }
+                               return new StructuredAttribute(
+                                       a.getName(),
+                                       a.getType().accept(this));
+                       })
+                       .collect(Collectors.toList());
+               final StructuredType.Builder builder = new 
StructuredType.Builder(
+                       structuredType.getObjectIdentifier(),
+                       attributes);
+               builder.setNullable(structuredType.isNullable());
+               builder.setFinal(structuredType.isFinal());
+               builder.setInstantiable(structuredType.isInstantiable());
+               builder.setComparision(structuredType.getComparision());
+               structuredType.getSuperType().ifPresent(st -> {
+                       final LogicalType visited = st.accept(this);
+                       if (!(visited instanceof StructuredType)) {
+                               throw new TableException("Unexpected super 
type. Structured type expected but was: " + visited);
+                       }
+                       builder.setSuperType((StructuredType) visited);
+               });
+               
structuredType.getDescription().ifPresent(builder::setDescription);
+               
structuredType.getImplementationClass().ifPresent(builder::setImplementationClass);
+               return builder.build();
+       }
+
+       @Override
+       protected LogicalType defaultMethod(LogicalType logicalType) {
+               return logicalType.copy();
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
new file mode 100644
index 0000000..cde6068
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.logical.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+/**
+ * Utilities for handling {@link LogicalType}s.
+ */
+@Internal
+public final class LogicalTypeUtils {
+
+       private static final TimeAttributeRemover TIME_ATTRIBUTE_REMOVER = new 
TimeAttributeRemover();
+
+       public static LogicalType removeTimeAttributes(LogicalType logicalType) 
{
+               return logicalType.accept(TIME_ATTRIBUTE_REMOVER);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class TimeAttributeRemover extends LogicalTypeDuplicator 
{
+
+               @Override
+               public LogicalType visit(TimestampType timestampType) {
+                       return new TimestampType(
+                               timestampType.isNullable(),
+                               timestampType.getPrecision());
+               }
+
+               @Override
+               public LogicalType visit(ZonedTimestampType zonedTimestampType) 
{
+                       return new ZonedTimestampType(
+                               zonedTimestampType.isNullable(),
+                               zonedTimestampType.getPrecision());
+               }
+
+               @Override
+               public LogicalType visit(LocalZonedTimestampType 
localZonedTimestampType) {
+                       return new LocalZonedTimestampType(
+                               localZonedTimestampType.isNullable(),
+                               localZonedTimestampType.getPrecision());
+               }
+       }
+
+       private LogicalTypeUtils() {
+               // no instantiation
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
new file mode 100644
index 0000000..43ca734
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.utils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/**
+ * Utilities for handling {@link DataType}s.
+ */
+@Internal
+public final class DataTypeUtils {
+
+       /**
+        * Replaces the {@link LogicalType} of a {@link DataType}, i.e., it 
keeps the bridging class.
+        */
+       public static DataType replaceLogicalType(DataType dataType, 
LogicalType replacement) {
+               return LogicalTypeDataTypeConverter.toDataType(replacement)
+                       .bridgedTo(dataType.getConversionClass());
+       }
+
+       private DataTypeUtils() {
+               // no instantiation
+       }
+}
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
new file mode 100644
index 0000000..ef7b3a8
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeDuplicatorTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types;
+
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeDuplicator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link LogicalTypeDuplicator}.
+ */
+@RunWith(Parameterized.class)
+public class LogicalTypeDuplicatorTest {
+
+       private static final LogicalTypeDuplicator DUPLICATOR = new 
LogicalTypeDuplicator();
+
+       private static final LogicalTypeDuplicator INT_REPLACER = new 
IntReplacer();
+
+       @Parameters(name = "{index}: {0}")
+       public static List<Object[]> testData() {
+               return Arrays.asList(
+                       new Object[][]{
+                               {new CharType(2), new CharType(2)},
+                               {createMultisetType(new IntType()), 
createMultisetType(new BigIntType())},
+                               {createArrayType(new IntType()), 
createArrayType(new BigIntType())},
+                               {createMapType(new IntType()), 
createMapType(new BigIntType())},
+                               {createRowType(new IntType()), 
createRowType(new BigIntType())},
+                               {createDistinctType(new IntType()), 
createDistinctType(new BigIntType())},
+                               {createUserType(new IntType()), 
createUserType(new BigIntType())},
+                               {createHumanType(), createHumanType()}
+                       }
+               );
+       }
+
+       @Parameter
+       public LogicalType logicalType;
+
+       @Parameter(1)
+       public LogicalType replacedLogicalType;
+
+       @Test
+       public void testDuplication() {
+               assertThat(logicalType.accept(DUPLICATOR), 
equalTo(logicalType));
+       }
+
+       @Test
+       public void testReplacement() {
+               assertThat(logicalType.accept(INT_REPLACER), 
equalTo(replacedLogicalType));
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private static class IntReplacer extends LogicalTypeDuplicator {
+               @Override
+               public LogicalType visit(IntType intType) {
+                       return new BigIntType();
+               }
+       }
+
+       private static MultisetType createMultisetType(LogicalType 
replacedType) {
+               return new MultisetType(new MultisetType(replacedType));
+       }
+
+       private static ArrayType createArrayType(LogicalType replacedType) {
+               return new ArrayType(new ArrayType(replacedType));
+       }
+
+       private static MapType createMapType(LogicalType replacedType) {
+               return new MapType(replacedType, new SmallIntType());
+       }
+
+       private static DistinctType createDistinctType(LogicalType 
replacedType) {
+               return new DistinctType.Builder(
+                               ObjectIdentifier.of("cat", "db", "Money"),
+                               replacedType)
+                       .setDescription("Money type desc.")
+                       .build();
+       }
+
+       private static RowType createRowType(LogicalType replacedType) {
+               return new RowType(
+                       Arrays.asList(
+                               new RowType.RowField("field1", new CharType(2)),
+                               new RowType.RowField("field2", new 
BooleanType()),
+                               new RowType.RowField("field3", replacedType)));
+       }
+
+       private static StructuredType createHumanType() {
+               return new StructuredType.Builder(
+                               ObjectIdentifier.of("cat", "db", "Human"),
+                               Collections.singletonList(
+                                       new 
StructuredType.StructuredAttribute("name", new VarCharType(), "Description.")))
+                       .setDescription("Human type desc.")
+                       .setFinal(false)
+                       .setInstantiable(false)
+                       .setImplementationClass(Human.class)
+                       .build();
+       }
+
+       private static StructuredType createUserType(LogicalType replacedType) {
+               return new StructuredType.Builder(
+                               ObjectIdentifier.of("cat", "db", "User"),
+                               Collections.singletonList(
+                                       new 
StructuredType.StructuredAttribute("setting", replacedType)))
+                       .setDescription("User type desc.")
+                       .setFinal(false)
+                       .setInstantiable(true)
+                       .setImplementationClass(User.class)
+                       .setSuperType(createHumanType())
+                       .build();
+       }
+
+       private abstract static class Human {
+               public String name;
+       }
+
+       private static final class User extends Human {
+               public int setting;
+       }
+}

Reply via email to