This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6fb887f47f [flink] Avoid deprecated usage on TableSchema, DataType and
DescriptorProperties (#4611)
6fb887f47f is described below
commit 6fb887f47f2e79f6b3142f094b20b6d7a3f86846
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Dec 2 21:11:23 2024 +0800
[flink] Avoid deprecated usage on TableSchema, DataType and
DescriptorProperties (#4611)
---
.../org/apache/paimon/flink/DataCatalogTable.java | 115 ++++++++++++------
.../java/org/apache/paimon/flink/FlinkCatalog.java | 55 +++++----
.../apache/paimon/flink/FlinkGenericCatalog.java | 6 -
.../apache/paimon/flink/SystemCatalogTable.java | 12 +-
.../flink/utils/FlinkCatalogPropertiesUtil.java | 102 +++++-----------
.../flink/utils/FlinkDescriptorProperties.java | 99 ++++++++++++++++
.../flink/FlinkCatalogPropertiesUtilTest.java | 130 +++++++++++++++------
.../org/apache/paimon/flink/FlinkCatalogTest.java | 9 +-
8 files changed, 342 insertions(+), 186 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
index 019d7bd689..e141581b47 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java
@@ -23,33 +23,55 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
-import org.apache.flink.table.catalog.CatalogTableImpl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
-/** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */
-public class DataCatalogTable extends CatalogTableImpl {
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** A {@link CatalogTable} to wrap {@link FileStoreTable}. */
+public class DataCatalogTable implements CatalogTable {
+ // Schema of the table (column names and types)
+ private final Schema schema;
+
+ // Partition keys if this is a partitioned table. It's an empty set if the
table is not
+ // partitioned
+ private final List<String> partitionKeys;
+
+ // Properties of the table
+ private final Map<String, String> options;
+
+ // Comment of the table
+ private final String comment;
private final Table table;
private final Map<String, String> nonPhysicalColumnComments;
public DataCatalogTable(
Table table,
- TableSchema tableSchema,
+ Schema resolvedSchema,
List<String> partitionKeys,
- Map<String, String> properties,
+ Map<String, String> options,
String comment,
Map<String, String> nonPhysicalColumnComments) {
- super(tableSchema, partitionKeys, properties, comment);
+ this.schema = resolvedSchema;
+ this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot
be null");
+ this.options = checkNotNull(options, "options cannot be null");
+
+ checkArgument(
+ options.entrySet().stream()
+ .allMatch(e -> e.getKey() != null && e.getValue() !=
null),
+ "properties cannot have null keys or values");
+
+ this.comment = comment;
+
this.table = table;
this.nonPhysicalColumnComments = nonPhysicalColumnComments;
}
@@ -66,32 +88,30 @@ public class DataCatalogTable extends CatalogTableImpl {
.filter(dataField -> dataField.description() != null)
.collect(Collectors.toMap(DataField::name,
DataField::description));
- return toSchema(getSchema(), columnComments);
+ return toSchema(schema, columnComments);
}
- /** Copied from {@link TableSchema#toSchema(Map)} to support versions
lower than 1.17. */
- private Schema toSchema(TableSchema tableSchema, Map<String, String>
comments) {
+ private Schema toSchema(Schema tableSchema, Map<String, String> comments) {
final Schema.Builder builder = Schema.newBuilder();
-
tableSchema
- .getTableColumns()
+ .getColumns()
.forEach(
column -> {
- if (column instanceof TableColumn.PhysicalColumn) {
- final TableColumn.PhysicalColumn c =
- (TableColumn.PhysicalColumn) column;
- builder.column(c.getName(), c.getType());
- } else if (column instanceof
TableColumn.MetadataColumn) {
- final TableColumn.MetadataColumn c =
- (TableColumn.MetadataColumn) column;
+ if (column instanceof
Schema.UnresolvedPhysicalColumn) {
+ final Schema.UnresolvedPhysicalColumn c =
+ (Schema.UnresolvedPhysicalColumn)
column;
+ builder.column(c.getName(), c.getDataType());
+ } else if (column instanceof
Schema.UnresolvedMetadataColumn) {
+ final Schema.UnresolvedMetadataColumn c =
+ (Schema.UnresolvedMetadataColumn)
column;
builder.columnByMetadata(
c.getName(),
- c.getType(),
- c.getMetadataAlias().orElse(null),
+ c.getDataType(),
+ c.getMetadataKey(),
c.isVirtual());
- } else if (column instanceof
TableColumn.ComputedColumn) {
- final TableColumn.ComputedColumn c =
- (TableColumn.ComputedColumn) column;
+ } else if (column instanceof
Schema.UnresolvedComputedColumn) {
+ final Schema.UnresolvedComputedColumn c =
+ (Schema.UnresolvedComputedColumn)
column;
builder.columnByExpression(c.getName(),
c.getExpression());
} else {
throw new IllegalArgumentException(
@@ -104,19 +124,16 @@ public class DataCatalogTable extends CatalogTableImpl {
builder.withComment(nonPhysicalColumnComments.get(colName));
}
});
-
tableSchema
.getWatermarkSpecs()
.forEach(
spec ->
builder.watermark(
- spec.getRowtimeAttribute(),
spec.getWatermarkExpr()));
-
+ spec.getColumnName(),
spec.getWatermarkExpression()));
if (tableSchema.getPrimaryKey().isPresent()) {
- UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get();
- builder.primaryKeyNamed(primaryKey.getName(),
primaryKey.getColumns());
+ Schema.UnresolvedPrimaryKey primaryKey =
tableSchema.getPrimaryKey().get();
+ builder.primaryKeyNamed(primaryKey.getConstraintName(),
primaryKey.getColumnNames());
}
-
return builder.build();
}
@@ -124,7 +141,7 @@ public class DataCatalogTable extends CatalogTableImpl {
public CatalogBaseTable copy() {
return new DataCatalogTable(
table,
- getSchema().copy(),
+ schema,
new ArrayList<>(getPartitionKeys()),
new HashMap<>(getOptions()),
getComment(),
@@ -135,10 +152,40 @@ public class DataCatalogTable extends CatalogTableImpl {
public CatalogTable copy(Map<String, String> options) {
return new DataCatalogTable(
table,
- getSchema(),
+ schema,
getPartitionKeys(),
options,
getComment(),
nonPhysicalColumnComments);
}
+
+ @Override
+ public Optional<String> getDescription() {
+ return Optional.of(getComment());
+ }
+
+ @Override
+ public Optional<String> getDetailedDescription() {
+ return Optional.of("This is a catalog table in an im-memory catalog");
+ }
+
+ @Override
+ public boolean isPartitioned() {
+ return !partitionKeys.isEmpty();
+ }
+
+ @Override
+ public List<String> getPartitionKeys() {
+ return partitionKeys;
+ }
+
+ @Override
+ public Map<String, String> getOptions() {
+ return options;
+ }
+
+ @Override
+ public String getComment() {
+ return comment != null ? comment : "";
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index c67e79c1c0..3a7f9790cc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
+import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
@@ -46,7 +47,6 @@ import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
-import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -96,7 +96,6 @@ import
org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
-import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
@@ -121,13 +120,6 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
@@ -152,11 +144,18 @@ import static
org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem;
+import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static
org.apache.paimon.flink.utils.TableStatsUtil.createTableColumnStats;
import static org.apache.paimon.flink.utils.TableStatsUtil.createTableStats;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -1008,18 +1007,18 @@ public class FlinkCatalog extends AbstractCatalog {
}
// materialized table is not resolved at this time.
if (!table1IsMaterialized) {
- org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
- org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
+ org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema();
+ org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema();
boolean pkEquality = false;
if (ts1.getPrimaryKey().isPresent() &&
ts2.getPrimaryKey().isPresent()) {
pkEquality =
Objects.equals(
- ts1.getPrimaryKey().get().getType(),
- ts2.getPrimaryKey().get().getType())
+
ts1.getPrimaryKey().get().getConstraintName(),
+
ts2.getPrimaryKey().get().getConstraintName())
&& Objects.equals(
- ts1.getPrimaryKey().get().getColumns(),
-
ts2.getPrimaryKey().get().getColumns());
+
ts1.getPrimaryKey().get().getColumnNames(),
+
ts2.getPrimaryKey().get().getColumnNames());
} else if (!ts1.getPrimaryKey().isPresent() &&
!ts2.getPrimaryKey().isPresent()) {
pkEquality = true;
}
@@ -1063,7 +1062,8 @@ public class FlinkCatalog extends AbstractCatalog {
private CatalogBaseTable toCatalogTable(Table table) {
Map<String, String> newOptions = new HashMap<>(table.options());
- TableSchema.Builder builder = TableSchema.builder();
+ org.apache.flink.table.api.Schema.Builder builder =
+ org.apache.flink.table.api.Schema.newBuilder();
Map<String, String> nonPhysicalColumnComments = new HashMap<>();
// add columns
@@ -1078,10 +1078,10 @@ public class FlinkCatalog extends AbstractCatalog {
if (optionalName == null ||
physicalColumns.contains(optionalName)) {
// build physical column from table row field
RowType.RowField field =
physicalRowFields.get(physicalColumnIndex++);
- builder.field(field.getName(),
fromLogicalToDataType(field.getType()));
+ builder.column(field.getName(),
fromLogicalToDataType(field.getType()));
} else {
// build non-physical column from options
- builder.add(deserializeNonPhysicalColumn(newOptions, i));
+ deserializeNonPhysicalColumn(newOptions, i, builder);
if (newOptions.containsKey(compoundKey(SCHEMA, i, COMMENT))) {
nonPhysicalColumnComments.put(
optionalName, newOptions.get(compoundKey(SCHEMA,
i, COMMENT)));
@@ -1093,22 +1093,18 @@ public class FlinkCatalog extends AbstractCatalog {
// extract watermark information
if (newOptions.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(SCHEMA,
WATERMARK)))) {
- builder.watermark(deserializeWatermarkSpec(newOptions));
+ deserializeWatermarkSpec(newOptions, builder);
}
// add primary keys
if (table.primaryKeys().size() > 0) {
- builder.primaryKey(
-
table.primaryKeys().stream().collect(Collectors.joining("_", "PK_", "")),
- table.primaryKeys().toArray(new String[0]));
+ builder.primaryKey(table.primaryKeys());
}
- TableSchema schema = builder.build();
+ org.apache.flink.table.api.Schema schema = builder.build();
// remove schema from options
- DescriptorProperties removeProperties = new
DescriptorProperties(false);
- removeProperties.putTableSchema(SCHEMA, schema);
- removeProperties.asMap().keySet().forEach(newOptions::remove);
+ FlinkDescriptorProperties.removeSchemaKeys(SCHEMA, schema, newOptions);
Options options = Options.fromMap(newOptions);
if (TableType.MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) {
@@ -1124,7 +1120,10 @@ public class FlinkCatalog extends AbstractCatalog {
}
private CatalogMaterializedTable buildMaterializedTable(
- Table table, Map<String, String> newOptions, TableSchema schema,
Options options) {
+ Table table,
+ Map<String, String> newOptions,
+ org.apache.flink.table.api.Schema schema,
+ Options options) {
String definitionQuery =
options.get(MATERIALIZED_TABLE_DEFINITION_QUERY);
IntervalFreshness freshness =
IntervalFreshness.of(
@@ -1148,7 +1147,7 @@ public class FlinkCatalog extends AbstractCatalog {
// remove materialized table related options
allMaterializedTableAttributes().forEach(newOptions::remove);
return CatalogMaterializedTable.newBuilder()
- .schema(schema.toSchema())
+ .schema(schema)
.comment(table.comment().orElse(""))
.partitionKeys(table.partitionKeys())
.options(newOptions)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
index 37bed2d048..75af5917bb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java
@@ -48,7 +48,6 @@ import
org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
-import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.procedures.Procedure;
import java.util.List;
@@ -86,11 +85,6 @@ public class FlinkGenericCatalog extends AbstractCatalog {
new FlinkGenericTableFactory(paimon.getFactory().get(),
flink.getFactory().get()));
}
- @Override
- public Optional<TableFactory> getTableFactory() {
- return flink.getTableFactory();
- }
-
@Override
public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
return flink.getFunctionDefinitionFactory();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
index d5d843d91b..f88a808713 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java
@@ -22,7 +22,6 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AuditLogTable;
import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.types.utils.TypeConversions;
@@ -32,11 +31,11 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
/** A {@link CatalogTable} to represent system table. */
public class SystemCatalogTable implements CatalogTable {
@@ -60,11 +59,8 @@ public class SystemCatalogTable implements CatalogTable {
Map<String, String> newOptions = new HashMap<>(table.options());
if (newOptions.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(SCHEMA,
WATERMARK)))) {
- WatermarkSpec watermarkSpec =
deserializeWatermarkSpec(newOptions);
- return builder.watermark(
- watermarkSpec.getRowtimeAttribute(),
- watermarkSpec.getWatermarkExpr())
- .build();
+ deserializeWatermarkSpec(newOptions, builder);
+ return builder.build();
}
}
return builder.build();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
index b0f99a6e89..fa84a1ca07 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
@@ -20,8 +20,7 @@ package org.apache.paimon.flink.utils;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.types.DataType;
@@ -36,48 +35,23 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
/**
* Utilities for ser/deserializing non-physical columns and watermark
into/from a map of string
* properties.
*/
public class FlinkCatalogPropertiesUtil {
-
- public static Map<String, String> serializeNonPhysicalColumns(
- Map<String, Integer> indexMap, List<TableColumn>
nonPhysicalColumns) {
- Map<String, String> serialized = new HashMap<>();
- for (TableColumn c : nonPhysicalColumns) {
- int index = indexMap.get(c.getName());
- serialized.put(compoundKey(SCHEMA, index, NAME), c.getName());
- serialized.put(
- compoundKey(SCHEMA, index, DATA_TYPE),
- c.getType().getLogicalType().asSerializableString());
- if (c instanceof TableColumn.ComputedColumn) {
- TableColumn.ComputedColumn computedColumn =
(TableColumn.ComputedColumn) c;
- serialized.put(compoundKey(SCHEMA, index, EXPR),
computedColumn.getExpression());
- } else {
- TableColumn.MetadataColumn metadataColumn =
(TableColumn.MetadataColumn) c;
- serialized.put(
- compoundKey(SCHEMA, index, METADATA),
-
metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()));
- serialized.put(
- compoundKey(SCHEMA, index, VIRTUAL),
- Boolean.toString(metadataColumn.isVirtual()));
- }
- }
- return serialized;
- }
+ public static final String SCHEMA = "schema";
/** Serialize non-physical columns of new api. */
public static Map<String, String>
serializeNonPhysicalNewColumns(ResolvedSchema schema) {
@@ -119,22 +93,6 @@ public class FlinkCatalogPropertiesUtil {
return serialized;
}
- public static Map<String, String> serializeWatermarkSpec(WatermarkSpec
watermarkSpec) {
- Map<String, String> serializedWatermarkSpec = new HashMap<>();
- String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
- serializedWatermarkSpec.put(
- compoundKey(watermarkPrefix, WATERMARK_ROWTIME),
- watermarkSpec.getRowtimeAttribute());
- serializedWatermarkSpec.put(
- compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
- watermarkSpec.getWatermarkExpr());
- serializedWatermarkSpec.put(
- compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE),
-
watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());
-
- return serializedWatermarkSpec;
- }
-
public static Map<String, String> serializeNewWatermarkSpec(
org.apache.flink.table.catalog.WatermarkSpec watermarkSpec) {
Map<String, String> serializedWatermarkSpec = new HashMap<>();
@@ -219,7 +177,8 @@ public class FlinkCatalogPropertiesUtil {
&&
SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches();
}
- public static TableColumn deserializeNonPhysicalColumn(Map<String, String>
options, int index) {
+ public static void deserializeNonPhysicalColumn(
+ Map<String, String> options, int index, Schema.Builder builder) {
String nameKey = compoundKey(SCHEMA, index, NAME);
String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE);
String exprKey = compoundKey(SCHEMA, index, EXPR);
@@ -227,45 +186,42 @@ public class FlinkCatalogPropertiesUtil {
String virtualKey = compoundKey(SCHEMA, index, VIRTUAL);
String name = options.get(nameKey);
- DataType dataType =
- TypeConversions.fromLogicalToDataType(
- LogicalTypeParser.parse(options.get(dataTypeKey)));
- TableColumn column;
if (options.containsKey(exprKey)) {
- column = TableColumn.computed(name, dataType,
options.get(exprKey));
+ final String expr = options.get(exprKey);
+ builder.columnByExpression(name, expr);
} else if (options.containsKey(metadataKey)) {
String metadataAlias = options.get(metadataKey);
boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey));
- column =
- metadataAlias.equals(name)
- ? TableColumn.metadata(name, dataType, isVirtual)
- : TableColumn.metadata(name, dataType,
metadataAlias, isVirtual);
+ DataType dataType =
+ TypeConversions.fromLogicalToDataType(
+ LogicalTypeParser.parse(
+ options.get(dataTypeKey),
+
Thread.currentThread().getContextClassLoader()));
+ if (metadataAlias.equals(name)) {
+ builder.columnByMetadata(name, dataType, isVirtual);
+ } else {
+ builder.columnByMetadata(name, dataType, metadataAlias,
isVirtual);
+ }
} else {
throw new RuntimeException(
String.format(
"Failed to build non-physical column. Current
index is %s, options are %s",
index, options));
}
-
- return column;
}
- public static WatermarkSpec deserializeWatermarkSpec(Map<String, String>
options) {
+ public static void deserializeWatermarkSpec(
+ Map<String, String> options, Schema.Builder builder) {
String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK);
String rowtimeKey = compoundKey(watermarkPrefixKey, 0,
WATERMARK_ROWTIME);
String exprKey = compoundKey(watermarkPrefixKey, 0,
WATERMARK_STRATEGY_EXPR);
- String dataTypeKey = compoundKey(watermarkPrefixKey, 0,
WATERMARK_STRATEGY_DATA_TYPE);
String rowtimeAttribute = options.get(rowtimeKey);
String watermarkExpressionString = options.get(exprKey);
- DataType watermarkExprOutputType =
- TypeConversions.fromLogicalToDataType(
- LogicalTypeParser.parse(options.get(dataTypeKey)));
- return new WatermarkSpec(
- rowtimeAttribute, watermarkExpressionString,
watermarkExprOutputType);
+ builder.watermark(rowtimeAttribute, watermarkExpressionString);
}
public static String compoundKey(Object... components) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java
new file mode 100644
index 0000000000..edc73ca7bf
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.table.api.Schema;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class for having a unified string-based representation of Table API
related classes such
+ * as Schema, TypeInformation, etc.
+ *
+ * <p>Note to implementers: Please try to reuse key names as much as possible.
Key-names should be
+ * hierarchical and lower case. Use "-" instead of dots or camel case. E.g.,
+ * connector.schema.start-from = from-earliest. Try not to use the higher
level in a key-name. E.g.,
+ * instead of connector.kafka.kafka-version use connector.kafka.version.
+ *
+ * <p>Properties with key normalization enabled contain only lower-case keys.
+ */
+public class FlinkDescriptorProperties {
+
+ public static final String NAME = "name";
+
+ public static final String DATA_TYPE = "data-type";
+
+ public static final String EXPR = "expr";
+
+ public static final String METADATA = "metadata";
+
+ public static final String VIRTUAL = "virtual";
+
+ public static final String WATERMARK = "watermark";
+
+ public static final String WATERMARK_ROWTIME = "rowtime";
+
+ public static final String WATERMARK_STRATEGY = "strategy";
+
+ public static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY +
'.' + EXPR;
+
+ public static final String WATERMARK_STRATEGY_DATA_TYPE =
WATERMARK_STRATEGY + '.' + DATA_TYPE;
+
+ public static final String PRIMARY_KEY_NAME = "primary-key.name";
+
+ public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns";
+
+ public static final String COMMENT = "comment";
+
+ public static void removeSchemaKeys(String key, Schema schema, Map<String,
String> options) {
+ checkNotNull(key);
+ checkNotNull(schema);
+
+ List<String> subKeys = Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA,
VIRTUAL);
+ for (int idx = 0; idx < schema.getColumns().size(); idx++) {
+ for (String subKey : subKeys) {
+ options.remove(key + '.' + idx + '.' + subKey);
+ }
+ }
+
+ if (!schema.getWatermarkSpecs().isEmpty()) {
+ subKeys =
+ Arrays.asList(
+ WATERMARK_ROWTIME,
+ WATERMARK_STRATEGY_EXPR,
+ WATERMARK_STRATEGY_DATA_TYPE);
+ for (int idx = 0; idx < schema.getWatermarkSpecs().size(); idx++) {
+ for (String subKey : subKeys) {
+ options.remove(key + '.' + WATERMARK + '.' + idx + '.' +
subKey);
+ }
+ }
+ }
+
+ schema.getPrimaryKey()
+ .ifPresent(
+ pk -> {
+ options.remove(key + '.' + PRIMARY_KEY_NAME);
+ options.remove(key + '.' + PRIMARY_KEY_COLUMNS);
+ });
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
index 9268a236b6..e32150b1fe 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
@@ -21,27 +21,35 @@ package org.apache.paimon.flink;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.WatermarkSpec;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionVisitor;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.types.DataType;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
+import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static
org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link FlinkCatalogPropertiesUtil}. */
@@ -49,18 +57,27 @@ public class FlinkCatalogPropertiesUtilTest {
@Test
public void testSerDeNonPhysicalColumns() {
- Map<String, Integer> indexMap = new HashMap<>();
- indexMap.put("comp", 2);
- indexMap.put("meta1", 3);
- indexMap.put("meta2", 5);
- List<TableColumn> columns = new ArrayList<>();
- columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2"));
- columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10)));
- columns.add(TableColumn.metadata("meta2",
DataTypes.BIGINT().notNull(), "price", true));
+ List<Schema.UnresolvedColumn> columns = new ArrayList<>();
+ columns.add(new Schema.UnresolvedComputedColumn("comp", new
SqlCallExpression("`k` * 2")));
+ columns.add(
+ new Schema.UnresolvedMetadataColumn("meta1",
DataTypes.VARCHAR(10), null, false));
+ columns.add(
+ new Schema.UnresolvedMetadataColumn(
+ "meta2", DataTypes.BIGINT().notNull(), "price", true,
null));
+
+ List<Column> resolvedColumns = new ArrayList<>();
+ resolvedColumns.add(Column.physical("phy1", DataTypes.INT()));
+ resolvedColumns.add(Column.physical("phy2", DataTypes.INT()));
+ resolvedColumns.add(
+ Column.computed("comp", new TestResolvedExpression("`k` * 2",
DataTypes.INT())));
+ resolvedColumns.add(Column.metadata("meta1", DataTypes.VARCHAR(10),
null, false));
+ resolvedColumns.add(Column.physical("phy3", DataTypes.INT()));
+ resolvedColumns.add(Column.metadata("meta2",
DataTypes.BIGINT().notNull(), "price", true));
// validate serialization
Map<String, String> serialized =
-
FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns);
+ FlinkCatalogPropertiesUtil.serializeNonPhysicalNewColumns(
+ new ResolvedSchema(resolvedColumns,
Collections.emptyList(), null));
Map<String, String> expected = new HashMap<>();
expected.put(compoundKey(SCHEMA, 2, NAME), "comp");
@@ -80,27 +97,26 @@ public class FlinkCatalogPropertiesUtilTest {
assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
// validate deserialization
- List<TableColumn> deserialized = new ArrayList<>();
-
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
2));
-
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
3));
-
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
5));
+ Schema.Builder builder = Schema.newBuilder();
+ FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 2,
builder);
+ FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 3,
builder);
+ FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 5,
builder);
- assertThat(deserialized).isEqualTo(columns);
-
- // validate that
+ assertThat(builder.build().getColumns())
+ .containsExactly(columns.toArray(new
Schema.UnresolvedColumn[0]));
}
@Test
public void testSerDeWatermarkSpec() {
WatermarkSpec watermarkSpec =
- new WatermarkSpec(
+ WatermarkSpec.of(
"test_time",
- "`test_time` - INTERVAL '0.001' SECOND",
- DataTypes.TIMESTAMP(3));
+ new TestResolvedExpression(
+ "`test_time` - INTERVAL '0.001' SECOND",
DataTypes.TIMESTAMP(3)));
// validate serialization
Map<String, String> serialized =
-
FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec);
+
FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec(watermarkSpec);
Map<String, String> expected = new HashMap<>();
String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
@@ -113,9 +129,13 @@ public class FlinkCatalogPropertiesUtilTest {
assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
// validate serialization
- WatermarkSpec deserialized =
-
FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized);
- assertThat(deserialized).isEqualTo(watermarkSpec);
+ Schema.Builder builder = Schema.newBuilder();
+ FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized,
builder);
+ assertThat(builder.build().getWatermarkSpecs()).hasSize(1);
+ Schema.UnresolvedWatermarkSpec actual =
builder.build().getWatermarkSpecs().get(0);
+
assertThat(actual.getColumnName()).isEqualTo(watermarkSpec.getRowtimeAttribute());
+ assertThat(actual.getWatermarkExpression().asSummaryString())
+
.isEqualTo(watermarkSpec.getWatermarkExpression().asSummaryString());
}
@Test
@@ -150,4 +170,44 @@ public class FlinkCatalogPropertiesUtilTest {
oldStyleOptions, Arrays.asList("phy1",
"phy2")))
.isEqualTo(3);
}
+
+ private static class TestResolvedExpression implements ResolvedExpression {
+ private final String name;
+ private final DataType outputDataType;
+
+ private TestResolvedExpression(String name, DataType outputDataType) {
+ this.name = name;
+ this.outputDataType = outputDataType;
+ }
+
+ @Override
+ public DataType getOutputDataType() {
+ return outputDataType;
+ }
+
+ @Override
+ public List<ResolvedExpression> getResolvedChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String asSummaryString() {
+ return new SqlCallExpression(name).asSummaryString();
+ }
+
+ @Override
+ public String asSerializableString() {
+ return name;
+ }
+
+ @Override
+ public List<Expression> getChildren() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public <R> R accept(ExpressionVisitor<R> expressionVisitor) {
+ return null;
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 27a8951097..e4286eb181 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -850,7 +850,7 @@ public class FlinkCatalogTest {
assertThat(t2.getComment()).isEqualTo(t1.getComment());
assertThat(t2.getOptions()).isEqualTo(t1.getOptions());
if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) {
- assertThat(t2.getSchema()).isEqualTo(t1.getSchema());
+
assertThat(t2.getUnresolvedSchema()).isEqualTo(t1.getUnresolvedSchema());
assertThat(((CatalogTable) (t2)).getPartitionKeys())
.isEqualTo(((CatalogTable) (t1)).getPartitionKeys());
assertThat(((CatalogTable) (t2)).isPartitioned())
@@ -864,7 +864,12 @@ public class FlinkCatalogTest {
t2.getUnresolvedSchema()
.resolve(new
TestSchemaResolver()))
.build())
- .isEqualTo(t1.getSchema().toSchema());
+ .isEqualTo(
+ Schema.newBuilder()
+ .fromResolvedSchema(
+ t1.getUnresolvedSchema()
+ .resolve(new
TestSchemaResolver()))
+ .build());
assertThat(mt2.getPartitionKeys()).isEqualTo(mt1.getPartitionKeys());
assertThat(mt2.isPartitioned()).isEqualTo(mt1.isPartitioned());
// validate definition query