pvary commented on code in PR #9606:
URL: https://github.com/apache/iceberg/pull/9606#discussion_r1502277710
##########
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -68,6 +73,42 @@ public static Schema convert(TableSchema schema) {
return freshIdentifierFieldIds(iSchema, schema);
}
+ /** Convert the flink table schema to apache iceberg schema with column
comment. */
+ public static Schema convert(TableSchema schema, Map<String, String>
columnComments) {
Review Comment:
I think it would be better to not introduce new methods which are using the
deprecated `TableSchema`.
Maybe we could something like this instead:
```java
/** Convert the flink table schema to apache iceberg schema with column
comment. */
public static Schema convert(ResolvedCatalogTable catalogTable) {
List<Column> tableColumns =
catalogTable.getResolvedSchema().getColumns();
// copy from org.apache.flink.table.api.Schema#toRowDataType
DataTypes.Field[] fields =
tableColumns.stream()
.map(
column -> {
if (column.getComment().isPresent()) {
return DataTypes.FIELD(column.getName(),
column.getDataType(), column.getComment().get());
} else {
return DataTypes.FIELD(column.getName(),
column.getDataType());
}
})
.toArray(DataTypes.Field[]::new);
LogicalType schemaType =
DataTypes.ROW(fields).notNull().getLogicalType();
Preconditions.checkArgument(
schemaType instanceof RowType, "Schema logical type should be
RowType.");
RowType root = (RowType) schemaType;
Type converted = root.accept(new FlinkTypeToType(root));
Schema iSchema = new Schema(converted.asStructType().fields());
return freshIdentifierFieldIds(iSchema, catalogTable.getSchema());
}
```
Since `Catalog.createTable` javadoc says (since Flink 1.13.0 /
[FLINK-21396](https://issues.apache.org/jira/browse/FLINK-21396)):
```
Creates a new table or view.
The framework will make sure to call this method with fully validated
ResolvedCatalogTable or ResolvedCatalogView. Those instances are
easy to serialize for a durable catalog implementation.
```
We might be able to change the `FlinkCatalog.createTable` to cast the table
parameter to `ResolvedCatalogTable`
```
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
throws CatalogException, TableAlreadyExistException {
if (Objects.equals(
table.getOptions().get("connector"),
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
throw new IllegalArgumentException(
"Cannot create the table with 'connector'='iceberg' table property
in "
+ "an iceberg catalog, Please create table with
'connector'='iceberg' property in a non-iceberg catalog or "
+ "create table without 'connector'='iceberg' related
properties in an iceberg table.");
}
Preconditions.
createIcebergTable(tablePath, table, ignoreIfExists);
}
```
and `FlinkCatalog.createIcebergTable` to accept only `ResolvedCatalogTable`,
like:
```
void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table,
boolean ignoreIfExists)
throws CatalogException, TableAlreadyExistException {
validateFlinkTable(table);
```
In this case we do not need the `instanceof` check, and we also would move
forward to remove the usage of the deprecated `TableSchema`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]