Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1778488116
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata
srcMetadata, TableMetadata dst
this.tableOps.commit(dstMetadata, srcMetadata);
}
}
+
+ /**
+ * Retrieves a list of data files from the current snapshot that match the
specified partition filter predicate.
+ *
+ * @param icebergPartitionFilterPredicate the predicate to filter partitions
+ * @return a list of data files that match the partition filter predicate
+ * @throws IOException if an I/O error occurs while accessing the table
metadata or reading manifest files
+ */
+ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike>
icebergPartitionFilterPredicate) throws IOException {
+ TableMetadata tableMetadata = accessTableMetadata();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ List<ManifestFile> dataManifestFiles =
currentSnapshot.dataManifests(this.tableOps.io());
+ List<DataFile> dataFileList = new ArrayList<>();
+ for (ManifestFile manifestFile : dataManifestFiles) {
+ ManifestReader<DataFile> manifestReader =
ManifestFiles.read(manifestFile, this.tableOps.io());
+ CloseableIterator<DataFile> dataFiles = manifestReader.iterator();
+ dataFiles.forEachRemaining(dataFile -> {
+ if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+ dataFileList.add(dataFile.copy());
+ }
+ });
+ }
+ return dataFileList;
+ }
+
+ /**
+ * Replaces partitions in the table with the specified list of data files.
+ *
+ * @param dataFiles the list of data files to replace partitions with
+ */
+ protected void replacePartitions(List<DataFile> dataFiles) {
+ if (dataFiles.isEmpty()) {
+ return;
+ }
+ ReplacePartitions replacePartitions = this.table.newReplacePartitions();
+ dataFiles.forEach(replacePartitions::addFile);
+ replacePartitions.commit();
Review Comment:
I have changed this API to use Overwrite API now as while testing on a bit
of complex partition it is giving unusual results.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]