[
https://issues.apache.org/jira/browse/GOBBLIN-1707?focusedWorklogId=814840&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814840
]
ASF GitHub Bot logged work on GOBBLIN-1707:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Oct/22 23:10
Start Date: 07/Oct/22 23:10
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990545035
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem
targetFs, CopyConfigurati
return copyEntities;
}
+ /** Not intended to escape this class... yet `public` visibility in case it
somehow does */
+ @RequiredArgsConstructor
+ public static class WrappedIOException extends RuntimeException {
+ @Getter
+ private final IOException wrappedException;
+
+ public void rethrowWrapped() throws IOException {
+ throw wrappedException;
+ }
+ }
+
/**
* Finds all files of the Iceberg's current snapshot
* Returns a map of path, file status for each file that needs to be copied
*/
- protected Map<Path, FileStatus> getFilePathsToFileStatus() throws
IOException {
- Map<Path, FileStatus> result = Maps.newHashMap();
+ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig) throws IOException {
+ Map<Path, FileStatus> results = Maps.newHashMap();
IcebergTable icebergTable = this.getIcebergTable();
+ // check first for case of nothing to replicate, to avoid needless
scanning of a potentially massive iceberg
+ IcebergSnapshotInfo currentSnapshotOverview =
icebergTable.getCurrentSnapshotInfoOverviewOnly();
+ if (currentSnapshotOverview.getMetadataPath().map(p ->
isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+ isPathPresentOnTarget(new
Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+ log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}'
and metadata '{}' both present on target",
+ dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+ currentSnapshotOverview.getManifestListPath(),
+ currentSnapshotOverview.getMetadataPath().orElse("<<ERROR:
MISSING!>>"));
+ return results;
+ }
Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos =
icebergTable.getIncrementalSnapshotInfosIterator();
Iterator<String> filePathsIterator = Iterators.concat(
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
- // TODO: decide: is it too much to print for every snapshot--instead
use `.debug`?
- log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'",
dbName, inputTableName,
- snapshotInfo.getSnapshotId(),
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
- return snapshotInfo.getAllPaths().iterator();
+ // log each snapshot, for context, in case of
`FileNotFoundException` during `FileSystem.getFileStatus()`
+ String manListPath = snapshotInfo.getManifestListPath();
+ log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path:
'{}'", dbName, inputTableName,
+ snapshotInfo.getSnapshotId(), manListPath,
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+ // ALGO: an iceberg's files form a tree of four levels:
metadata.json -> manifest-list -> manifest -> data;
+ // most critically, all are presumed immutable and uniquely named,
although any may be replaced. we depend
+ // also on incremental copy being run always atomically: to commit
each iceberg only upon its full success.
+ // thus established, the presence of a file at dest (identified by
path/name) guarantees its entire subtree is
+ // already copied--and, given immutability, completion of a prior
copy naturally renders that file up-to-date.
+ // hence, its entire subtree may be short-circuited. nevertheless,
absence of a file at dest cannot imply
+ // its entire subtree necessarily requires copying, because it is
possible, even likely in practice, that some
+ // metadata files would have been replaced (e.g. during snapshot
compaction). in such instances, at least
+ // some of the children pointed to within could have been copied
prior, when they previously appeared as a
+ // child of the current file's predecessor (which this new meta file
now replaces).
+ if (!isPathPresentOnTarget(new Path(manListPath), targetFs,
copyConfig)) {
+ List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+ for (IcebergSnapshotInfo.ManifestFileInfo mfi :
snapshotInfo.getManifestFiles()) {
Review Comment:
Or we can add more clarity in
IcebergTable.getIncrementalSnapshotInfosIterator(), at least I was missing the
assumption that it return the snapshot from oldest to latest. and files only
appear once in earliest snapshot that contains it. Might be just me though...
Issue Time Tracking
-------------------
Worklog Id: (was: 814840)
Time Spent: 3h 20m (was: 3h 10m)
> Add Iceberg support to DistCp
> -----------------------------
>
> Key: GOBBLIN-1707
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1707
> Project: Apache Gobblin
> Issue Type: Task
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 3h 20m
> Remaining Estimate: 0h
>
> Add capability for iceberg copy/replication to distcp. Support incremental
> copy (only of delta changes since last time) in addition to full copy on
> first time.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)