kingeasternsun commented on a change in pull request #3681: URL: https://github.com/apache/iceberg/pull/3681#discussion_r817389286
########## File path: flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java ########## @@ -149,14 +158,44 @@ public static TableSchema toSchema(RowType rowType) { * @param schema iceberg schema to convert. * @return Flink TableSchema. */ - public static TableSchema toSchema(Schema schema) { + public static TableSchema toSchema(Schema schema, Map<String, String> properties) { TableSchema.Builder builder = TableSchema.builder(); // Add columns. for (RowType.RowField field : convert(schema).getFields()) { builder.field(field.getName(), TypeConversions.fromLogicalToDataType(field.getType())); } + // Add watermark + final int watermarkCount = Review comment: > We can check if a column is used as watermark column, and add its field type as TIMESTAMP(3) Maybe like this? @yittg ```java public static TableSchema toSchema(Schema schema, Map<String, String> properties) { TableSchema.Builder builder = TableSchema.builder(); HashMap<String, DataType> watermarkRow = new HashMap<String, DataType>(); // Add watermark final int watermarkCount = properties.keySet().stream() .filter( (k) -> k.startsWith(FLINK_PREFIX + DescriptorProperties.WATERMARK) && k.endsWith('.' + DescriptorProperties.WATERMARK_ROWTIME)) .mapToInt((k) -> 1) .sum(); if (watermarkCount > 0) { for (int i = 0; i < watermarkCount; i++) { final String rowtimeKey = FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_ROWTIME; final String exprKey = FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_STRATEGY_EXPR; final String typeKey = FLINK_PREFIX + DescriptorProperties.WATERMARK + '.' + i + '.' + DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; final String rowtime = optionalGet(rowtimeKey, properties).orElseThrow(exceptionSupplier(rowtimeKey)); final String exprString = optionalGet(exprKey, properties).orElseThrow(exceptionSupplier(exprKey)); final String typeString = optionalGet(typeKey, properties).orElseThrow(exceptionSupplier(typeKey)); final DataType exprType = TypeConversions.fromLogicalToDataType(LogicalTypeParser.parse(typeString)); builder.watermark(rowtime, exprString, exprType); watermarkRow.put(rowtime, exprType); } } // Add columns. for (RowType.RowField field : convert(schema).getFields()) { builder.field( field.getName(), watermarkRow.getOrDefault(field.getName(), TypeConversions.fromLogicalToDataType(field.getType()))); } // Add primary key. Set<Integer> identifierFieldIds = schema.identifierFieldIds(); if (!identifierFieldIds.isEmpty()) { List<String> columns = Lists.newArrayListWithExpectedSize(identifierFieldIds.size()); for (Integer identifierFieldId : identifierFieldIds) { String columnName = schema.findColumnName(identifierFieldId); Preconditions.checkNotNull(columnName, "Cannot find field with id %s in schema %s", identifierFieldId, schema); columns.add(columnName); } builder.primaryKey(columns.toArray(new String[0])); } return builder.build(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org