Blazer-007 commented on code in PR #4142:
URL: https://github.com/apache/gobblin/pull/4142#discussion_r2436688045
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -304,6 +306,13 @@ protected void overwritePartition(List<DataFile>
dataFiles, String partitionColN
} else {
log.warn("~{}~ No current snapshot found before overwrite", tableId);
}
+
+ updateSchema(updatedSchema, false);
+ overwritePartition(dataFiles, partitionColName, partitionValue);
Review Comment:
Should this be part of a single table transaction commit ?
Is there any issue with using of that ?
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java:
##########
@@ -192,14 +192,18 @@ private Path addUUIDToPath(String filePathStr) {
}
private PostPublishStep createOverwritePostPublishStep() {
- IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new
IcebergOverwritePartitionsStep(
- this.getDestIcebergTable().getTableId().toString(),
- this.partitionColumnName,
- this.partitionColValue,
- this.properties
- );
-
- return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(),
icebergOverwritePartitionStep, 0);
+ try {
+ IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new
IcebergOverwritePartitionsStep(
+ this.getDestIcebergTable().getTableId().toString(),
+ this.getSrcIcebergTable().accessTableMetadata().schema(),
+ this.partitionColumnName,
Review Comment:
Fetching schema after generating files may lead to corruption in very high
concurrency scenario, we should get schema when we get manifest files similar
to what done in full table replication.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -292,7 +294,7 @@ public List<DataFile>
getPartitionSpecificDataFiles(Predicate<StructLike> iceber
* @param partitionColName the partition column name whose data files are to
be replaced
* @param partitionValue the partition column value on which data files
will be replaced
*/
- protected void overwritePartition(List<DataFile> dataFiles, String
partitionColName, String partitionValue)
+ protected void updateSchemaAndPartition(List<DataFile> dataFiles, Schema
updatedSchema, String partitionColName, String partitionValue)
Review Comment:
Update Java doc here as well
Also name of function maybe changed to `updateSchemaAndOverwritePartition` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]