[
https://issues.apache.org/jira/browse/GOBBLIN-1707?focusedWorklogId=814825&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-814825
]
ASF GitHub Bot logged work on GOBBLIN-1707:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 07/Oct/22 22:31
Start Date: 07/Oct/22 22:31
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3575:
URL: https://github.com/apache/gobblin/pull/3575#discussion_r990533651
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java:
##########
@@ -153,30 +156,105 @@ Collection<CopyEntity> generateCopyEntities(FileSystem
targetFs, CopyConfigurati
return copyEntities;
}
+ /** Not intended to escape this class... yet `public` visibility in case it
somehow does */
+ @RequiredArgsConstructor
+ public static class WrappedIOException extends RuntimeException {
+ @Getter
+ private final IOException wrappedException;
+
+ public void rethrowWrapped() throws IOException {
+ throw wrappedException;
+ }
+ }
+
/**
* Finds all files of the Iceberg's current snapshot
* Returns a map of path, file status for each file that needs to be copied
*/
- protected Map<Path, FileStatus> getFilePathsToFileStatus() throws
IOException {
- Map<Path, FileStatus> result = Maps.newHashMap();
+ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig) throws IOException {
+ Map<Path, FileStatus> results = Maps.newHashMap();
IcebergTable icebergTable = this.getIcebergTable();
+ // check first for case of nothing to replicate, to avoid needless
scanning of a potentially massive iceberg
+ IcebergSnapshotInfo currentSnapshotOverview =
icebergTable.getCurrentSnapshotInfoOverviewOnly();
+ if (currentSnapshotOverview.getMetadataPath().map(p ->
isPathPresentOnTarget(new Path(p), targetFs, copyConfig)).orElse(false) &&
+ isPathPresentOnTarget(new
Path(currentSnapshotOverview.getManifestListPath()), targetFs, copyConfig)) {
+ log.info("{}.{} - skipping entire iceberg, since snapshot '{}' at '{}'
and metadata '{}' both present on target",
+ dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+ currentSnapshotOverview.getManifestListPath(),
+ currentSnapshotOverview.getMetadataPath().orElse("<<ERROR:
MISSING!>>"));
+ return results;
+ }
Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos =
icebergTable.getIncrementalSnapshotInfosIterator();
Iterator<String> filePathsIterator = Iterators.concat(
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
- // TODO: decide: is it too much to print for every snapshot--instead
use `.debug`?
- log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'",
dbName, inputTableName,
- snapshotInfo.getSnapshotId(),
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
- return snapshotInfo.getAllPaths().iterator();
+ // log each snapshot, for context, in case of
`FileNotFoundException` during `FileSystem.getFileStatus()`
+ String manListPath = snapshotInfo.getManifestListPath();
+ log.info("{}.{} - loaded snapshot '{}' at '{}' from metadata path:
'{}'", dbName, inputTableName,
+ snapshotInfo.getSnapshotId(), manListPath,
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
+ // ALGO: an iceberg's files form a tree of four levels:
metadata.json -> manifest-list -> manifest -> data;
+ // most critically, all are presumed immutable and uniquely named,
although any may be replaced. we depend
+ // also on incremental copy being run always atomically: to commit
each iceberg only upon its full success.
+ // thus established, the presence of a file at dest (identified by
path/name) guarantees its entire subtree is
+ // already copied--and, given immutability, completion of a prior
copy naturally renders that file up-to-date.
+ // hence, its entire subtree may be short-circuited. nevertheless,
absence of a file at dest cannot imply
+ // its entire subtree necessarily requires copying, because it is
possible, even likely in practice, that some
+ // metadata files would have been replaced (e.g. during snapshot
compaction). in such instances, at least
+ // some of the children pointed to within could have been copied
prior, when they previously appeared as a
+ // child of the current file's predecessor (which this new meta file
now replaces).
+ if (!isPathPresentOnTarget(new Path(manListPath), targetFs,
copyConfig)) {
+ List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+ for (IcebergSnapshotInfo.ManifestFileInfo mfi :
snapshotInfo.getManifestFiles()) {
+ if (!isPathPresentOnTarget(new Path(mfi.getManifestFilePath()),
targetFs, copyConfig)) {
+ missingPaths.add(mfi.getManifestFilePath());
+ mfi.getListedFilePaths().stream().filter(p ->
+ !isPathPresentOnTarget(new Path(p), targetFs, copyConfig)
+ ).forEach(missingPaths::add);
+ }
+ }
+ return missingPaths.iterator();
+ } else {
+ log.info("{}.{} - snapshot '{}' already present on target...
skipping (with contents)",
+ dbName, inputTableName, snapshotInfo.getSnapshotId());
+ // IMPORTANT: separately consider metadata path, to handle case of
'metadata-only' snapshot reusing mf-list
+ Optional<String> nonReplicatedMetadataPath =
snapshotInfo.getMetadataPath().filter(p ->
+ !isPathPresentOnTarget(new Path(p), targetFs, copyConfig));
+ log.info("{}.{} - metadata is {}already present on target",
dbName, inputTableName, nonReplicatedMetadataPath.isPresent() ? "NOT " : "");
+ return nonReplicatedMetadataPath.map(p ->
Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+ }
})
);
Iterable<String> filePathsIterable = () -> filePathsIterator;
- // TODO: investigate whether streaming initialization of `Map`
preferable--`getFileStatus` network calls would
- // likely benefit from parallelism
- for (String pathString : filePathsIterable) {
- Path path = new Path(pathString);
- result.put(path, this.sourceFs.getFileStatus(path));
+ try {
+ // TODO: investigate whether streaming initialization of `Map`
preferable--`getFileStatus()` network calls likely
+ // to benefit from parallelism
+ for (String pathString : filePathsIterable) {
+ try {
+ Path path = new Path(pathString);
+ results.put(path, this.sourceFs.getFileStatus(path));
+ } catch (FileNotFoundException fnfe) {
+ if (!shouldTolerateMissingSourceFiles) {
+ throw fnfe;
+ } else {
+ // log, but otherwise swallow... to continue on
+ log.warn("MIA source file... did premature deletion subvert
time-travel or maybe metadata read interleaved with delete?", fnfe);
+ }
+ }
+ }
+ } catch (WrappedIOException wrapper) {
+ wrapper.rethrowWrapped();
+ }
+ return results;
+ }
+
+ /** @returns whether `path` is present on `targetFs`, tunneling checked
exceptions and caching results throughout */
+ protected static boolean isPathPresentOnTarget(Path path, FileSystem
targetFs, CopyConfiguration copyConfig) {
+ try {
+ // omit considering timestamp (or other markers of freshness), as files
should be immutable
Review Comment:
it's technically possible to change files in place, but to do so, breaks the
iceberg's repeatability. it's not something we should ever encourage...
instead write new files and create a snapshot w/ those that replaces the
original ones! the real issue w/ in-place mods to data files is that every
delta copy must devolve into a full comparison of the filestatus (between
source and dest) for the entire iceberg table. that's a huge amount of effort
in some cases... all because of misbehaving writers/updaters.
I suggest that if we do find we're working w/ writers that do this, we can
return here to add the necessary complexity, likely something we'll control via
configuration, so it's not always on.
Issue Time Tracking
-------------------
Worklog Id: (was: 814825)
Time Spent: 2h 40m (was: 2.5h)
> Add Iceberg support to DistCp
> -----------------------------
>
> Key: GOBBLIN-1707
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1707
> Project: Apache Gobblin
> Issue Type: Task
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 2h 40m
> Remaining Estimate: 0h
>
> Add capability for iceberg copy/replication to distcp. Support incremental
> copy (only of delta changes since last time) in addition to full copy on
> first time.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)