[
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=938562&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-938562
]
ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 17/Oct/24 04:50
Start Date: 17/Oct/24 04:50
Worklog Time Spent: 10m
Work Description: Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1804100436
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +227,59 @@ protected void registerIcebergTable(TableMetadata
srcMetadata, TableMetadata dst
this.tableOps.commit(dstMetadata, srcMetadata);
}
}
+
+ /**
+ * Retrieves a list of data files from the current snapshot that match the
specified partition filter predicate.
+ *
+ * @param icebergPartitionFilterPredicate the predicate to filter partitions
+ * @return a list of data files that match the partition filter predicate
+ * @throws IOException if an I/O error occurs while accessing the table
metadata or reading manifest files
+ */
+ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike>
icebergPartitionFilterPredicate) throws IOException {
+ TableMetadata tableMetadata = accessTableMetadata();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ log.info("Starting to copy data files from snapshot: {}",
currentSnapshot.snapshotId());
+ //TODO: Add support for deleteManifests as well later
+ // Currently supporting dataManifests only
+ List<ManifestFile> dataManifestFiles =
currentSnapshot.dataManifests(this.tableOps.io());
+ List<DataFile> dataFileList = new ArrayList<>();
+ for (ManifestFile manifestFile : dataManifestFiles) {
+ try (ManifestReader<DataFile> manifestReader =
ManifestFiles.read(manifestFile, this.tableOps.io());
+ CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
+ dataFiles.forEachRemaining(dataFile -> {
+ if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+ dataFileList.add(dataFile.copy());
+ }
+ });
+ } catch (IOException e) {
+ log.warn("Failed to read manifest file: {} " , manifestFile.path(), e);
+ }
Review Comment:
yeah completely agree with your suggestion, somehow i missed it let me
correct it by failing the copy with proper logging
Issue Time Tracking
-------------------
Worklog Id: (was: 938562)
Time Spent: 5h (was: 4h 50m)
> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-2159
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)