Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1810287691
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +227,67 @@ protected void registerIcebergTable(TableMetadata
srcMetadata, TableMetadata dst
this.tableOps.commit(dstMetadata, srcMetadata);
}
}
+
+ /**
+ * Retrieves a list of data files from the current snapshot that match the
specified partition filter predicate.
+ *
+ * @param icebergPartitionFilterPredicate the predicate to filter partitions
+ * @return a list of data files that match the partition filter predicate
+ * @throws TableNotFoundException if error occurred while accessing the
table metadata
+ * @throws RuntimeException if error occurred while reading the manifest file
+ */
+ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike>
icebergPartitionFilterPredicate)
+ throws TableNotFoundException {
+ TableMetadata tableMetadata = accessTableMetadata();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ long currentSnapshotId = currentSnapshot.snapshotId();
+ List<DataFile> knownDataFiles = new ArrayList<>();
+ log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles",
tableId, currentSnapshotId,
+ knownDataFiles.size());
+ //TODO: Add support for deleteManifests as well later
+ // Currently supporting dataManifests only
+ List<ManifestFile> dataManifestFiles =
currentSnapshot.dataManifests(this.tableOps.io());
+ for (ManifestFile manifestFile : dataManifestFiles) {
+ try (ManifestReader<DataFile> manifestReader =
ManifestFiles.read(manifestFile, this.tableOps.io());
+ CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
+ dataFiles.forEachRemaining(dataFile -> {
+ if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+ knownDataFiles.add(dataFile.copy());
+ }
+ });
+ log.info("~{}~ for snapshot '{}' - '{}' total known iceberg
datafiles", tableId, currentSnapshotId,
+ knownDataFiles.size());
+ } catch (IOException e) {
+ String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read
manifest file: %s", tableId,
+ currentSnapshotId, manifestFile.path());
+ log.error(errMsg, e);
+ throw new RuntimeException(errMsg, e);
+ }
+ }
+ return knownDataFiles;
+ }
+
+ /**
+ * Overwrite partition data files in the table for the specified partition
col name & partition value.
+ * <p>
+ * Overwrite partition replaces the partition using the expression filter
provided.
+ * </p>
+ * @param dataFiles the list of data files to replace partitions with
+ * @param partitionColName the partition column name whose data files are to
be replaced
+ * @param partitionValue the partition column value on which data files
will be replaced
+ */
+ protected void overwritePartition(List<DataFile> dataFiles, String
partitionColName, String partitionValue)
+ throws TableNotFoundException {
+ if (dataFiles.isEmpty()) {
+ return;
+ }
+ log.info("~{}~ SnapshotId before overwrite: {}", tableId,
accessTableMetadata().currentSnapshot().snapshotId());
+ OverwriteFiles overwriteFiles = this.table.newOverwrite();
+ overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName,
partitionValue));
+ dataFiles.forEach(overwriteFiles::addFile);
+ overwriteFiles.commit();
+ this.tableOps.refresh();
+ log.info("~{}~ SnapshotId after overwrite: {}", tableId,
accessTableMetadata().currentSnapshot().snapshotId());
Review Comment:
Thanks for this suggestion - Added as a note in comment before the line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]