danielcweeks commented on code in PR #14979:
URL: https://github.com/apache/iceberg/pull/14979#discussion_r2669049457
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java:
##########
@@ -227,6 +229,22 @@ private void commitToTable(
return minOffset == null || envelope.offset() >= minOffset;
})
.map(envelope -> (DataWritten) envelope.event().payload())
+ .filter(
+ payload -> {
+ UUID expectedUuid = tableReference.uuid();
+ UUID payloadTableUuid = payload.tableReference().uuid();
+
+ if (expectedUuid != null &&
!expectedUuid.equals(payloadTableUuid)) {
Review Comment:
This is one of the difficult things with KC and errors like this. If we
fail, then you have events in the control topic, which will remain. The only
options at that point are to clear the control topic or move the offsets
forward. However, doing that can result in losing commit data for other tables
since events for multiple tables are all intermixed.
I don't think there's really a safe recovery and if the data needs to be
recovered, you would create a new consumer group for the control topic and
reset the consumer offset back to prior to these events.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]