This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new adaf020  [FLINK-21396][table-common] Improve usability of new schema 
hierarchy
adaf020 is described below

commit adaf020d24806b3e765298a613008b728a00c2ce
Author: Timo Walther <twal...@apache.org>
AuthorDate: Fri Mar 5 10:12:54 2021 +0100

    [FLINK-21396][table-common] Improve usability of new schema hierarchy
    
    This closes #15096.
---
 .../flink/table/catalog/SchemaResolutionTest.java  | 15 +++-
 .../java/org/apache/flink/table/api/Schema.java    | 60 ++++++++++++++-
 .../org/apache/flink/table/api/TableSchema.java    | 87 ++++++++++++++++++++++
 .../org/apache/flink/table/catalog/Column.java     | 54 ++++----------
 .../apache/flink/table/catalog/ResolvedSchema.java |  2 +-
 5 files changed, 172 insertions(+), 46 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 3e7c029..7ff5791 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -95,9 +95,10 @@ public class SchemaResolutionTest {
                                                 DataTypes.FIELD("name", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("age", 
DataTypes.INT()),
                                                 DataTypes.FIELD("flag", 
DataTypes.BOOLEAN()))),
-                                Column.metadata("topic", DataTypes.STRING(), 
true),
+                                Column.metadata("topic", DataTypes.STRING(), 
null, true),
                                 Column.computed("ts", 
COMPUTED_COLUMN_RESOLVED),
-                                Column.metadata("orig_ts", 
DataTypes.TIMESTAMP(3), "timestamp"),
+                                Column.metadata(
+                                        "orig_ts", DataTypes.TIMESTAMP(3), 
"timestamp", false),
                                 Column.computed("proctime", 
PROCTIME_RESOLVED)),
                         Collections.singletonList(new WatermarkSpec("ts", 
WATERMARK_RESOLVED)),
                         UniqueConstraint.primaryKey(
@@ -291,7 +292,7 @@ public class SchemaResolutionTest {
 
     @Test
     public void testSourceRowDataType() {
-        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA, true, 
true);
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
         final DataType expectedDataType =
                 DataTypes.ROW(
                                 DataTypes.FIELD("id", 
DataTypes.INT().notNull()),
@@ -310,6 +311,14 @@ public class SchemaResolutionTest {
         assertThat(resolvedSchema.toSourceRowDataType(), 
equalTo(expectedDataType));
     }
 
+    @Test
+    public void testLegacySchemaCompatibility() {
+        final ResolvedSchema resolvedSchema = resolveSchema(SCHEMA);
+        final ResolvedSchema resolvedSchemaFromLegacy =
+                
resolveSchema(TableSchema.fromResolvedSchema(resolvedSchema).toSchema());
+        assertThat(resolvedSchemaFromLegacy, equalTo(resolvedSchema));
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     private static void testError(Schema schema, String errorMessage) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
index d749a81..46e9940 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
@@ -302,6 +302,23 @@ public final class Schema {
         /**
          * Declares a metadata column that is appended to this schema.
          *
+         * <p>See {@link #columnByMetadata(String, AbstractDataType, boolean)} 
for a detailed
+         * explanation.
+         *
+         * <p>This method uses a type string that can be easily persisted in a 
durable catalog.
+         *
+         * @param columnName column name
+         * @param serializableTypeString data type of the column
+         * @param isVirtual whether the column should be persisted or not
+         */
+        public Builder columnByMetadata(
+                String columnName, String serializableTypeString, boolean 
isVirtual) {
+            return columnByMetadata(columnName, 
DataTypes.of(serializableTypeString), isVirtual);
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
          * <p>Metadata columns allow to access connector and/or format 
specific fields for every row
          * of a table. For example, a metadata column can be used to read and 
write the timestamp
          * from and to Kafka records for time-based operations. The connector 
and format
@@ -332,6 +349,24 @@ public final class Schema {
         /**
          * Declares a metadata column that is appended to this schema.
          *
+         * <p>See {@link #columnByMetadata(String, AbstractDataType, String)} 
for a detailed
+         * explanation.
+         *
+         * <p>This method uses a type string that can be easily persisted in a 
durable catalog.
+         *
+         * @param columnName column name
+         * @param serializableTypeString data type of the column
+         * @param metadataKey identifying metadata key, if null the column 
name will be used as
+         *     metadata key
+         */
+        public Builder columnByMetadata(
+                String columnName, String serializableTypeString, @Nullable 
String metadataKey) {
+            return columnByMetadata(columnName, 
DataTypes.of(serializableTypeString), metadataKey);
+        }
+
+        /**
+         * Declares a metadata column that is appended to this schema.
+         *
          * <p>Metadata columns allow to access connector and/or format 
specific fields for every row
          * of a table. For example, a metadata column can be used to read and 
write the timestamp
          * from and to Kafka records for time-based operations. The connector 
and format
@@ -365,6 +400,29 @@ public final class Schema {
         }
 
         /**
+         * Declares a metadata column that is appended to this schema.
+         *
+         * <p>See {@link #columnByMetadata(String, AbstractDataType, String, 
boolean)} for a
+         * detailed explanation.
+         *
+         * <p>This method uses a type string that can be easily persisted in a 
durable catalog.
+         *
+         * @param columnName column name
+         * @param serializableTypeString data type of the column
+         * @param metadataKey identifying metadata key, if null the column 
name will be used as
+         *     metadata key
+         * @param isVirtual whether the column should be persisted or not
+         */
+        public Builder columnByMetadata(
+                String columnName,
+                String serializableTypeString,
+                @Nullable String metadataKey,
+                boolean isVirtual) {
+            return columnByMetadata(
+                    columnName, DataTypes.of(serializableTypeString), 
metadataKey, isVirtual);
+        }
+
+        /**
          * Declares that the given column should serve as an event-time (i.e. 
rowtime) attribute and
          * specifies a corresponding watermark strategy as an expression.
          *
@@ -502,7 +560,7 @@ public final class Schema {
                             columnByMetadata(
                                     metadataColumn.getName(),
                                     metadataColumn.getDataType(),
-                                    
metadataColumn.getMetadataAlias().orElse(null),
+                                    
metadataColumn.getMetadataKey().orElse(null),
                                     metadataColumn.isVirtual());
                         }
                     });
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
index af08a58..412ca5f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
@@ -21,7 +21,12 @@ package org.apache.flink.table.api;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.table.api.TableColumn.ComputedColumn;
+import org.apache.flink.table.api.TableColumn.MetadataColumn;
+import org.apache.flink.table.api.TableColumn.PhysicalColumn;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LegacyTypeInformationType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -298,6 +303,40 @@ public class TableSchema {
         return Optional.ofNullable(primaryKey);
     }
 
+    /** Helps to migrate to the new {@link Schema} class. */
+    public Schema toSchema() {
+        final Schema.Builder builder = Schema.newBuilder();
+
+        columns.forEach(
+                column -> {
+                    if (column instanceof PhysicalColumn) {
+                        final PhysicalColumn c = (PhysicalColumn) column;
+                        builder.column(c.getName(), c.getType());
+                    } else if (column instanceof MetadataColumn) {
+                        final MetadataColumn c = (MetadataColumn) column;
+                        builder.columnByMetadata(
+                                c.getName(),
+                                c.getType(),
+                                c.getMetadataAlias().orElse(null),
+                                c.isVirtual());
+                    } else if (column instanceof ComputedColumn) {
+                        final ComputedColumn c = (ComputedColumn) column;
+                        builder.columnByExpression(c.getName(), 
c.getExpression());
+                    } else {
+                        throw new IllegalArgumentException("Unsupported column 
type: " + column);
+                    }
+                });
+
+        watermarkSpecs.forEach(
+                spec -> builder.watermark(spec.getRowtimeAttribute(), 
spec.getWatermarkExpr()));
+
+        if (primaryKey != null) {
+            builder.primaryKeyNamed(primaryKey.getName(), 
primaryKey.getColumns());
+        }
+
+        return builder.build();
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder();
@@ -369,6 +408,54 @@ public class TableSchema {
         }
     }
 
+    /** Helps to migrate to the new {@link ResolvedSchema} to old API methods. 
*/
+    public static TableSchema fromResolvedSchema(ResolvedSchema 
resolvedSchema) {
+        final TableSchema.Builder builder = TableSchema.builder();
+
+        resolvedSchema.getColumns().stream()
+                .map(
+                        column -> {
+                            if (column instanceof Column.PhysicalColumn) {
+                                final Column.PhysicalColumn c = 
(Column.PhysicalColumn) column;
+                                return TableColumn.physical(c.getName(), 
c.getDataType());
+                            } else if (column instanceof 
Column.MetadataColumn) {
+                                final Column.MetadataColumn c = 
(Column.MetadataColumn) column;
+                                return TableColumn.metadata(
+                                        c.getName(),
+                                        c.getDataType(),
+                                        c.getMetadataKey().orElse(null),
+                                        c.isVirtual());
+                            } else if (column instanceof 
Column.ComputedColumn) {
+                                final Column.ComputedColumn c = 
(Column.ComputedColumn) column;
+                                return TableColumn.computed(
+                                        c.getName(),
+                                        c.getDataType(),
+                                        
c.getExpression().asSerializableString());
+                            }
+                            throw new IllegalArgumentException(
+                                    "Unsupported column type: " + column);
+                        })
+                .forEach(builder::add);
+
+        resolvedSchema
+                .getWatermarkSpecs()
+                .forEach(
+                        spec ->
+                                builder.watermark(
+                                        spec.getRowtimeAttribute(),
+                                        
spec.getWatermarkExpression().asSerializableString(),
+                                        
spec.getWatermarkExpression().getOutputDataType()));
+
+        resolvedSchema
+                .getPrimaryKey()
+                .ifPresent(
+                        pk ->
+                                builder.primaryKey(
+                                        pk.getName(), 
pk.getColumns().toArray(new String[0])));
+
+        return builder.build();
+    }
+
     public static Builder builder() {
         return new Builder();
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
index 019a9ef..73658c4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Column.java
@@ -65,44 +65,16 @@ public abstract class Column {
     }
 
     /**
-     * Creates a metadata column from metadata of the given column name.
-     *
-     * <p>The column is not virtual by default.
-     */
-    public static MetadataColumn metadata(String name, DataType dataType) {
-        return metadata(name, dataType, null, false);
-    }
-
-    /**
-     * Creates a metadata column from metadata of the given column name.
-     *
-     * <p>Allows to specify whether the column is virtual or not.
-     */
-    public static MetadataColumn metadata(String name, DataType type, boolean 
isVirtual) {
-        return metadata(name, type, null, isVirtual);
-    }
-
-    /**
-     * Creates a metadata column from metadata of the given alias.
-     *
-     * <p>The column is not virtual by default.
-     */
-    public static MetadataColumn metadata(String name, DataType type, String 
metadataAlias) {
-        Preconditions.checkNotNull(metadataAlias, "Metadata alias can not be 
null.");
-        return metadata(name, type, metadataAlias, false);
-    }
-
-    /**
      * Creates a metadata column from metadata of the given column name or 
from metadata of the
-     * given alias (if not null).
+     * given key (if not null).
      *
      * <p>Allows to specify whether the column is virtual or not.
      */
     public static MetadataColumn metadata(
-            String name, DataType dataType, @Nullable String metadataAlias, 
boolean isVirtual) {
+            String name, DataType dataType, @Nullable String metadataKey, 
boolean isVirtual) {
         Preconditions.checkNotNull(name, "Column name can not be null.");
         Preconditions.checkNotNull(dataType, "Column data type can not be 
null.");
-        return new MetadataColumn(name, dataType, metadataAlias, isVirtual);
+        return new MetadataColumn(name, dataType, metadataKey, isVirtual);
     }
 
     /**
@@ -257,14 +229,14 @@ public abstract class Column {
     /** Representation of a metadata column. */
     public static final class MetadataColumn extends Column {
 
-        private final @Nullable String metadataAlias;
+        private final @Nullable String metadataKey;
 
         private final boolean isVirtual;
 
         private MetadataColumn(
-                String name, DataType dataType, @Nullable String 
metadataAlias, boolean isVirtual) {
+                String name, DataType dataType, @Nullable String metadataKey, 
boolean isVirtual) {
             super(name, dataType);
-            this.metadataAlias = metadataAlias;
+            this.metadataKey = metadataKey;
             this.isVirtual = isVirtual;
         }
 
@@ -272,8 +244,8 @@ public abstract class Column {
             return isVirtual;
         }
 
-        public Optional<String> getMetadataAlias() {
-            return Optional.ofNullable(metadataAlias);
+        public Optional<String> getMetadataKey() {
+            return Optional.ofNullable(metadataKey);
         }
 
         @Override
@@ -290,10 +262,10 @@ public abstract class Column {
         public Optional<String> explainExtras() {
             final StringBuilder sb = new StringBuilder();
             sb.append("METADATA");
-            if (metadataAlias != null) {
+            if (metadataKey != null) {
                 sb.append(" FROM ");
                 sb.append("'");
-                sb.append(EncodingUtils.escapeSingleQuotes(metadataAlias));
+                sb.append(EncodingUtils.escapeSingleQuotes(metadataKey));
                 sb.append("'");
             }
             if (isVirtual) {
@@ -304,7 +276,7 @@ public abstract class Column {
 
         @Override
         public Column copy(DataType newDataType) {
-            return new MetadataColumn(name, newDataType, metadataAlias, 
isVirtual);
+            return new MetadataColumn(name, newDataType, metadataKey, 
isVirtual);
         }
 
         @Override
@@ -319,12 +291,12 @@ public abstract class Column {
                 return false;
             }
             MetadataColumn that = (MetadataColumn) o;
-            return isVirtual == that.isVirtual && 
Objects.equals(metadataAlias, that.metadataAlias);
+            return isVirtual == that.isVirtual && Objects.equals(metadataKey, 
that.metadataKey);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(super.hashCode(), metadataAlias, isVirtual);
+            return Objects.hash(super.hashCode(), metadataKey, isVirtual);
         }
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
index 1a1ec01..417bfa6 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -57,7 +57,7 @@ public final class ResolvedSchema {
     private final List<WatermarkSpec> watermarkSpecs;
     private final @Nullable UniqueConstraint primaryKey;
 
-    ResolvedSchema(
+    public ResolvedSchema(
             List<Column> columns,
             List<WatermarkSpec> watermarkSpecs,
             @Nullable UniqueConstraint primaryKey) {

Reply via email to