[
https://issues.apache.org/jira/browse/GOBBLIN-2159?focusedWorklogId=939414&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-939414
]
ASF GitHub Bot logged work on GOBBLIN-2159:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Oct/24 09:00
Start Date: 22/Oct/24 09:00
Worklog Time Spent: 10m
Work Description: Blazer-007 commented on code in PR #4058:
URL: https://github.com/apache/gobblin/pull/4058#discussion_r1810287691
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java:
##########
@@ -217,4 +227,67 @@ protected void registerIcebergTable(TableMetadata
srcMetadata, TableMetadata dst
this.tableOps.commit(dstMetadata, srcMetadata);
}
}
+
+ /**
+ * Retrieves a list of data files from the current snapshot that match the
specified partition filter predicate.
+ *
+ * @param icebergPartitionFilterPredicate the predicate to filter partitions
+ * @return a list of data files that match the partition filter predicate
+ * @throws TableNotFoundException if error occurred while accessing the
table metadata
+ * @throws RuntimeException if error occurred while reading the manifest file
+ */
+ public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike>
icebergPartitionFilterPredicate)
+ throws TableNotFoundException {
+ TableMetadata tableMetadata = accessTableMetadata();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ long currentSnapshotId = currentSnapshot.snapshotId();
+ List<DataFile> knownDataFiles = new ArrayList<>();
+ log.info("~{}~ for snapshot '{}' - '{}' total known iceberg datafiles",
tableId, currentSnapshotId,
+ knownDataFiles.size());
+ //TODO: Add support for deleteManifests as well later
+ // Currently supporting dataManifests only
+ List<ManifestFile> dataManifestFiles =
currentSnapshot.dataManifests(this.tableOps.io());
+ for (ManifestFile manifestFile : dataManifestFiles) {
+ try (ManifestReader<DataFile> manifestReader =
ManifestFiles.read(manifestFile, this.tableOps.io());
+ CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
+ dataFiles.forEachRemaining(dataFile -> {
+ if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+ knownDataFiles.add(dataFile.copy());
+ }
+ });
+ log.info("~{}~ for snapshot '{}' - '{}' total known iceberg
datafiles", tableId, currentSnapshotId,
+ knownDataFiles.size());
+ } catch (IOException e) {
+ String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read
manifest file: %s", tableId,
+ currentSnapshotId, manifestFile.path());
+ log.error(errMsg, e);
+ throw new RuntimeException(errMsg, e);
+ }
+ }
+ return knownDataFiles;
+ }
+
+ /**
+ * Overwrite partition data files in the table for the specified partition
col name & partition value.
+ * <p>
+ * Overwrite partition replaces the partition using the expression filter
provided.
+ * </p>
+ * @param dataFiles the list of data files to replace partitions with
+ * @param partitionColName the partition column name whose data files are to
be replaced
+ * @param partitionValue the partition column value on which data files
will be replaced
+ */
+ protected void overwritePartition(List<DataFile> dataFiles, String
partitionColName, String partitionValue)
+ throws TableNotFoundException {
+ if (dataFiles.isEmpty()) {
+ return;
+ }
+ log.info("~{}~ SnapshotId before overwrite: {}", tableId,
accessTableMetadata().currentSnapshot().snapshotId());
+ OverwriteFiles overwriteFiles = this.table.newOverwrite();
+ overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName,
partitionValue));
+ dataFiles.forEach(overwriteFiles::addFile);
+ overwriteFiles.commit();
+ this.tableOps.refresh();
+ log.info("~{}~ SnapshotId after overwrite: {}", tableId,
accessTableMetadata().currentSnapshot().snapshotId());
Review Comment:
Thanks for this suggestion - Added as a note in comment before the line
Issue Time Tracking
-------------------
Worklog Id: (was: 939414)
Time Spent: 8h 10m (was: 8h)
> Support Partition Based Copy in Iceberg Distcp
> ----------------------------------------------
>
> Key: GOBBLIN-2159
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2159
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vivek Rai
> Priority: Major
> Time Spent: 8h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)