kumarpritam863 commented on code in PR #15396:
URL: https://github.com/apache/iceberg/pull/15396#discussion_r3032556075
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/SchemaUtils.java:
##########
@@ -126,8 +128,20 @@ private static void commitSchemaUpdates(Table table,
SchemaUpdate.Consumer updat
update -> updateSchema.addColumn(update.parentName(), update.name(),
update.type()));
updateTypes.forEach(update -> updateSchema.updateColumn(update.name(),
update.type()));
makeOptionals.forEach(update ->
updateSchema.makeColumnOptional(update.name()));
- updateSchema.commit();
- LOG.info("Schema for table {} updated with new columns", table.name());
+ try {
+ updateSchema.commit();
+ LOG.info("Schema for table {} updated with new columns", table.name());
+ } catch (ValidationException | CommitFailedException e) {
+ // this can happen if the field already exists in the table, which is
fine (other task
+ // committed the schema). No action required
Review Comment:
Yes this exception catching is dangerous. Also to handle the scenario where
other task has added the new column we have
```
Tasks.range(1)
.retry(IcebergSinkConfig.SCHEMA_UPDATE_RETRIES)
.run(notUsed -> commitSchemaUpdates(table, updates));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]