[ 
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=936780&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-936780
 ]

ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Sep/24 11:43
            Start Date: 27/Sep/24 11:43
    Worklog Time Spent: 10m 
      Work Description: Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1778488116


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +231,44 @@ protected void registerIcebergTable(TableMetadata 
srcMetadata, TableMetadata dst
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws IOException if an I/O error occurs while accessing the table 
metadata or reading manifest files
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate) throws IOException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());
+    List<DataFile> dataFileList = new ArrayList<>();
+    for (ManifestFile manifestFile : dataManifestFiles) {
+      ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(manifestFile, this.tableOps.io());
+      CloseableIterator<DataFile> dataFiles = manifestReader.iterator();
+      dataFiles.forEachRemaining(dataFile -> {
+        if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+          dataFileList.add(dataFile.copy());
+        }
+      });
+    }
+    return dataFileList;
+  }
+
+  /**
+   * Replaces partitions in the table with the specified list of data files.
+   *
+   * @param dataFiles the list of data files to replace partitions with
+   */
+  protected void replacePartitions(List<DataFile> dataFiles) {
+    if (dataFiles.isEmpty()) {
+      return;
+    }
+    ReplacePartitions replacePartitions = this.table.newReplacePartitions();
+    dataFiles.forEach(replacePartitions::addFile);
+    replacePartitions.commit();

Review Comment:
   I have changed this API to use Overwrite API now as while testing on a bit 
of complex partition it is giving unusual results.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 936780)
    Time Spent: 3h 40m  (was: 3.5h)

> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
>                 Key: GOBBLIN-2159
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to