[
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=936301&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-936301
]
ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Sep/24 05:50
Start Date: 25/Sep/24 05:50
Worklog Time Spent: 10m
Work Description: Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1774541066
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata
srcMetadata, TableMetadata dst
this.tableOps.commit(dstMetadata, srcMetadata);
}
}
+
+ /**
+ * Retrieves a list of data files from the current snapshot that match the
specified partition filter predicate.
+ *
+ * @param icebergPartitionFilterPredicate the predicate to filter partitions
+ * @return a list of data files that match the partition filter predicate
+ * @throws IOException if an I/O error occurs while accessing the table
metadata or reading manifest files
+ */
+ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike>
icebergPartitionFilterPredicate) throws IOException {
+ TableMetadata tableMetadata = accessTableMetadata();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ List<ManifestFile> dataManifestFiles =
currentSnapshot.dataManifests(this.tableOps.io());
+ List<DataFile> dataFileList = new ArrayList<>();
+ for (ManifestFile manifestFile : dataManifestFiles) {
+ ManifestReader<DataFile> manifestReader =
ManifestFiles.read(manifestFile, this.tableOps.io());
+ CloseableIterator<DataFile> dataFiles = manifestReader.iterator();
+ dataFiles.forEachRemaining(dataFile -> {
+ if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+ dataFileList.add(dataFile.copy());
+ }
+ });
+ }
+ return dataFileList;
+ }
+
+ /**
+ * Replaces partitions in the table with the specified list of data files.
+ *
+ * @param dataFiles the list of data files to replace partitions with
+ */
+ protected void replacePartitions(List<DataFile> dataFiles) {
+ if (dataFiles.isEmpty()) {
+ return;
+ }
+ ReplacePartitions replacePartitions = this.table.newReplacePartitions();
+ dataFiles.forEach(replacePartitions::addFile);
+ replacePartitions.commit();
Review Comment:
Yes, it deletes partition that it gets from datafile using
datafile.partition().
Will add a note too.
Issue Time Tracking
-------------------
Worklog Id: (was: 936301)
Time Spent: 3h (was: 2h 50m)
> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-2159
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)