amogh-jahagirdar commented on code in PR #14979:
URL: https://github.com/apache/iceberg/pull/14979#discussion_r2666799219
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -227,6 +229,22 @@ private void commitToTable(
return minOffset == null || envelope.offset() >= minOffset;
})
.map(envelope -> (DataWritten) envelope.event().payload())
+ .filter(
+ payload -> {
+ UUID expectedUuid = tableReference.uuid();
+ UUID payloadTableUuid = payload.tableReference().uuid();
Review Comment:
Looks like expectedUUID is based off the table reference here but
technically we also just did a load table at the beginning of this function
where with unlucky timing a concurrent drop + create could've happened.
In the end, for that case the assertUUID in the catalog commit would catch
that so we're covered from a commit perspective, but from a perspective of what
UUID is expected to compare against, I wonder if we should use the UUID off of
the loaded table?
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/IcebergWriterFactory.java:
##########
@@ -67,7 +69,15 @@ RecordWriter createWriter(String tableName, SinkRecord
sample, boolean ignoreMis
}
}
- return new IcebergWriter(table, tableName, config);
+ UUID tableUuid = table.uuid();
+ if (tableUuid == null) {
Review Comment:
I wouldn't think there are any cases where this would be null? I guess it's
OK though to be defensive against in case it happens, it preserves the existing
behavior in case somehow it is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]