[
https://issues.apache.org/jira/browse/GOBBLIN-1707?focusedWorklogId=814829&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814829
]
ASF GitHub Bot logged work on GOBBLIN-1707:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Oct/22 22:34
Start Date: 07/Oct/22 22:34
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990534636
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem
targetFs, CopyConfigurati
return copyEntities;
}
+ /** Not intended to escape this class... yet `public` visibility in case it
somehow does */
+ @RequiredArgsConstructor
+ public static class WrappedIOException extends RuntimeException {
+ @Getter
+ private final IOException wrappedException;
+
+ public void rethrowWrapped() throws IOException {
+ throw wrappedException;
+ }
+ }
+
/**
* Finds all files of the Iceberg's current snapshot
* Returns a map of path, file status for each file that needs to be copied
*/
- protected Map<Path, FileStatus> getFilePathsToFileStatus() throws
IOException {
- Map<Path, FileStatus> result = Maps.newHashMap();
+ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig) throws IOException {
+ Map<Path, FileStatus> results = Maps.newHashMap();
IcebergTable icebergTable = this.getIcebergTable();
+ // check first for case of nothing to replicate, to avoid needless
scanning of a potentially massive iceberg
+ IcebergSnapshotInfo currentSnapshotOverview =
icebergTable.getCurrentSnapshotInfoOverviewOnly();
+ if (currentSnapshotOverview.getMetadataPath().map(p ->
isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+ isPathPresentOnTarget(new
Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+ log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}'
and metadata '{}' both present on target",
+ dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+ currentSnapshotOverview.getManifestListPath(),
+ currentSnapshotOverview.getMetadataPath().orElse("<<ERROR:
MISSING!>>"));
+ return results;
+ }
Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos =
icebergTable.getIncrementalSnapshotInfosIterator();
Iterator<String> filePathsIterator = Iterators.concat(
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
- // TODO: decide: is it too much to print for every snapshot--instead
use `.debug`?
- log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'",
dbName, inputTableName,
- snapshotInfo.getSnapshotId(),
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
- return snapshotInfo.getAllPaths().iterator();
+ // log each snapshot, for context, in case of
`FileNotFoundException` during `FileSystem.getFileStatus()`
+ String manListPath = snapshotInfo.getManifestListPath();
+ log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path:
'{}'", dbName, inputTableName,
+ snapshotInfo.getSnapshotId(), manListPath,
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+ // ALGO: an iceberg's files form a tree of four levels:
metadata.json -> manifest-list -> manifest -> data;
+ // most critically, all are presumed immutable and uniquely named,
although any may be replaced. we depend
+ // also on incremental copy being run always atomically: to commit
each iceberg only upon its full success.
+ // thus established, the presence of a file at dest (identified by
path/name) guarantees its entire subtree is
+ // already copied--and, given immutability, completion of a prior
copy naturally renders that file up-to-date.
+ // hence, its entire subtree may be short-circuited. nevertheless,
absence of a file at dest cannot imply
+ // its entire subtree necessarily requires copying, because it is
possible, even likely in practice, that some
+ // metadata files would have been replaced (e.g. during snapshot
compaction). in such instances, at least
+ // some of the children pointed to within could have been copied
prior, when they previously appeared as a
+ // child of the current file's predecessor (which this new meta file
now replaces).
+ if (!isPathPresentOnTarget(new Path(manListPath), targetFs,
copyConfig)) {
+ List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+ for (IcebergSnapshotInfo.ManifestFileInfo mfi :
snapshotInfo.getManifestFiles()) {
Review Comment:
those are definitely the semantics of using
`IcebergTable.getIncrementalSnapshotInfosIterator()`. it's in the javadoc
there. where do you want me to repeat it over here?
Issue Time Tracking
-------------------
Worklog Id: (was: 814829)
Time Spent: 3h (was: 2h 50m)
> Add Iceberg support to DistCp
> -----------------------------
>
> Key: GOBBLIN-1707
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1707
> Project: Apache Gobblin
> Issue Type: Task
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 3h
> Remaining Estimate: 0h
>
> Add capability for iceberg copy/replication to distcp. Support incremental
> copy (only of delta changes since last time) in addition to full copy on
> first time.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)