stevenzwu commented on code in PR #6222:
URL: https://github.com/apache/iceberg/pull/6222#discussion_r1042787664
##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java:
##########
@@ -104,11 +105,38 @@ public static Schema convert(Schema baseSchema,
TableSchema flinkSchema) {
Types.StructType struct = convert(flinkSchema).asStruct();
// reassign ids to match the base schema
Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()),
baseSchema);
+ // reassign doc to match the base schema
+ schema = reassignDoc(schema, baseSchema);
+
// fix types that can't be represented in Flink (UUID)
Schema fixedSchema = FlinkFixupTypes.fixup(schema, baseSchema);
return freshIdentifierFieldIds(fixedSchema, flinkSchema);
}
+ private static Schema reassignDoc(Schema schema, Schema docSourceSchema) {
+ TypeUtil.CustomOrderSchemaVisitor<Type> visitor = new
FlinkFixupDoc(docSourceSchema);
Review Comment:
`FlinkFixupDoc` doesn't seem like the right name. it is not fixing up sth
(like `FlinkFixupTypes` does). Maybe call it `ReassignDoc` or `CopyDoc`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]