ajfabbri commented on a change in pull request #843: HADOOP-15183 S3Guard store becomes inconsistent after partial failure of rename URL: https://github.com/apache/hadoop/pull/843#discussion_r288854348
########## File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ########## @@ -1225,130 +1301,292 @@ private boolean innerRename(Path source, Path dest) } } - // If we have a MetadataStore, track deletions/creations. - Collection<Path> srcPaths = null; - List<PathMetadata> dstMetas = null; - if (hasMetadataStore()) { - srcPaths = new HashSet<>(); // srcPaths need fast look up before put - dstMetas = new ArrayList<>(); - } - // TODO S3Guard HADOOP-13761: retries when source paths are not visible yet + // Validation completed: time to begin the operation. + // The store-specific rename operation is used to keep the store + // to date with the in-progress operation. + // for the null store, these are all no-ops. + final RenameTracker renameTracker = + metadataStore.initiateRenameOperation( + createStoreContext(), + src, srcStatus, dest); + final AtomicLong bytesCopied = new AtomicLong(); + int renameParallelLimit = RENAME_PARALLEL_LIMIT; + final List<CompletableFuture<Path>> activeCopies = + new ArrayList<>(renameParallelLimit); + // aggregate operation to wait for the copies to complete then reset + // the list. + final FunctionsRaisingIOE.FunctionRaisingIOE<String, Void> + completeActiveCopies = (String reason) -> { + LOG.debug("Waiting for {} active copies to complete: {}", + activeCopies.size(), reason); + waitForCompletion(activeCopies); + activeCopies.clear(); + return null; + }; + // TODO S3Guard: performance: mark destination dirs as authoritative // Ok! Time to start - if (srcStatus.isFile()) { - LOG.debug("rename: renaming file {} to {}", src, dst); - long length = srcStatus.getLen(); - S3ObjectAttributes objectAttributes = - createObjectAttributes(srcStatus.getPath(), - srcStatus.getETag(), srcStatus.getVersionId()); - S3AReadOpContext readContext = createReadContext(srcStatus, inputPolicy, - changeDetectionPolicy, readAhead); - if (dstStatus != null && dstStatus.isDirectory()) { - String newDstKey = maybeAddTrailingSlash(dstKey); - String filename = - srcKey.substring(pathToKey(src.getParent()).length()+1); - newDstKey = newDstKey + filename; - CopyResult copyResult = copyFile(srcKey, newDstKey, length, - objectAttributes, readContext); - S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, - keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst), - username, copyResult.getETag(), copyResult.getVersionId()); + try { + if (srcStatus.isFile()) { + // the source is a file. + Path copyDestinationPath = dst; + String copyDestinationKey = dstKey; + S3ObjectAttributes sourceAttributes = + createObjectAttributes(srcStatus); + S3AReadOpContext readContext = createReadContext(srcStatus, inputPolicy, + changeDetectionPolicy, readAhead); + if (dstStatus != null && dstStatus.isDirectory()) { + // destination is a directory: build the final destination underneath + String newDstKey = maybeAddTrailingSlash(dstKey); + String filename = + srcKey.substring(pathToKey(src.getParent()).length() + 1); + newDstKey = newDstKey + filename; + copyDestinationKey = newDstKey; + copyDestinationPath = keyToQualifiedPath(newDstKey); + } + // destination either does not exist or is a file to overwrite. + LOG.debug("rename: renaming file {} to {}", src, copyDestinationPath); + copySourceAndUpdateTracker(renameTracker, + src, + srcKey, + sourceAttributes, + readContext, + copyDestinationPath, + copyDestinationKey, + false); + bytesCopied.addAndGet(srcStatus.getLen()); + // delete the source + deleteObjectAtPath(src, srcKey, true); + // and update the tracker + renameTracker.sourceObjectsDeleted(Lists.newArrayList(src)); } else { - CopyResult copyResult = copyFile(srcKey, dstKey, srcStatus.getLen(), - objectAttributes, readContext); - S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, dst, - length, getDefaultBlockSize(dst), username, - copyResult.getETag(), copyResult.getVersionId()); - } - innerDelete(srcStatus, false); - } else { - LOG.debug("rename: renaming directory {} to {}", src, dst); - - // This is a directory to directory copy - dstKey = maybeAddTrailingSlash(dstKey); - srcKey = maybeAddTrailingSlash(srcKey); + LOG.debug("rename: renaming directory {} to {}", src, dst); - //Verify dest is not a child of the source directory - if (dstKey.startsWith(srcKey)) { - throw new RenameFailedException(srcKey, dstKey, - "cannot rename a directory to a subdirectory of itself "); - } + // This is a directory-to-directory copy + dstKey = maybeAddTrailingSlash(dstKey); + srcKey = maybeAddTrailingSlash(srcKey); - List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(); - if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) { - // delete unnecessary fake directory. - keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); - } + //Verify dest is not a child of the source directory + if (dstKey.startsWith(srcKey)) { + throw new RenameFailedException(srcKey, dstKey, + "cannot rename a directory to a subdirectory of itself "); + } - Path parentPath = keyToQualifiedPath(srcKey); - RemoteIterator<S3ALocatedFileStatus> iterator = - listFilesAndEmptyDirectories(parentPath, true); - while (iterator.hasNext()) { - S3ALocatedFileStatus status = iterator.next(); - long length = status.getLen(); - String key = pathToKey(status.getPath()); - if (status.isDirectory() && !key.endsWith("/")) { - key += "/"; + // These are the lists of keys to delete and of their paths, the + // latter being used to update the rename tracker. + final List<DeleteObjectsRequest.KeyVersion> keysToDelete = + new ArrayList<>(); + final List<Path> pathsToDelete = new ArrayList<>(); + // to update the lists of keys and paths. + final BiFunction<Path, String, Void> queueToDelete = + (Path path, String key) -> { + pathsToDelete.add(path); + keysToDelete.add(new DeleteObjectsRequest.KeyVersion(key)); + return null; + }; + + // a lambda-expression to block waiting for ay active copies to finish + // then delete all queued keys + paths to delete. + final FunctionsRaisingIOE.FunctionRaisingIOE<String, Void> + completeActiveCopiesAndDeleteSources = + (String reason) -> { + completeActiveCopies.apply(reason); + removeSourceObjects(renameTracker, + keysToDelete, + pathsToDelete); + // now reset the lists. + keysToDelete.clear(); + pathsToDelete.clear(); + return null; + }; + + if (dstStatus != null + && dstStatus.isEmptyDirectory() == Tristate.TRUE) { + // delete unnecessary fake directory at the destination. + // this MUST be done before anything else so that + // rollback code doesn't get confused and insert a tombstone + // marker. + deleteObjectAtPath(dstStatus.getPath(), dstKey, false); } - keysToDelete - .add(new DeleteObjectsRequest.KeyVersion(key)); - String newDstKey = - dstKey + key.substring(srcKey.length()); - S3ObjectAttributes objectAttributes = - createObjectAttributes(status.getPath(), - status.getETag(), status.getVersionId()); - S3AReadOpContext readContext = createReadContext(status, inputPolicy, - changeDetectionPolicy, readAhead); - CopyResult copyResult = copyFile(key, newDstKey, length, - objectAttributes, readContext); - - if (hasMetadataStore()) { - // with a metadata store, the object entries need to be updated, - // including, potentially, the ancestors - Path childSrc = keyToQualifiedPath(key); - Path childDst = keyToQualifiedPath(newDstKey); - if (objectRepresentsDirectory(key, length)) { - S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc, - childDst, username); - } else { - S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc, - childDst, length, getDefaultBlockSize(childDst), username, - copyResult.getETag(), copyResult.getVersionId()); + + Path parentPath = keyToQualifiedPath(srcKey); + final RemoteIterator<S3ALocatedFileStatus> iterator = + listFilesAndEmptyDirectories(parentPath, true); + while (iterator.hasNext()) { + S3ALocatedFileStatus status = iterator.next(); + String k = pathToKey(status.getPath()); + String key = (status.isDirectory() && !k.endsWith("/")) + ? k + "/" + : k; + String newDstKey = + dstKey + key.substring(srcKey.length()); + Path childSourcePath = keyToQualifiedPath(key); + + queueToDelete.apply(childSourcePath, key); + + Path childDestPath = keyToQualifiedPath(newDstKey); + S3ObjectAttributes sourceAttributes = + createObjectAttributes( + status.getPath(), + status.getETag(), + status.getVersionId(), + status.getLen()); + S3AReadOpContext readContext = createReadContext(status, inputPolicy, + changeDetectionPolicy, readAhead); + // queue the copy operation for execution in the thread pool + CompletableFuture<Path> copy = submit(boundedThreadPool, () -> + copySourceAndUpdateTracker( + renameTracker, + childSourcePath, + key, + sourceAttributes, + readContext, + childDestPath, + newDstKey, + true)); + bytesCopied.addAndGet(srcStatus.getLen()); + activeCopies.add(copy); + if (activeCopies.size() == renameParallelLimit) { + // the limit of active copies has been reached; + // wait for completion or errors to surface. + LOG.debug("Waiting for active copies to complete"); + completeActiveCopies.apply("batch threshold reached"); } - // Ancestor directories may not be listed, so we explicitly add them - S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas, - keyToQualifiedPath(srcKey), childSrc, childDst, username); - } + if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { + // finish ongoing copies then delete all queued keys. + // provided the parallel limit is a factor of the max entry + // constant, this will not need to block for the copy, and + // simply jump straight to the delete. + completeActiveCopiesAndDeleteSources.apply("paged delete"); + } + } // end of iteration through the list - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - removeKeys(keysToDelete, true, false); - } + + // await the final set of copies and their deletion + // This will notify the renameTracker that these objects + // have been deleted. + completeActiveCopiesAndDeleteSources.apply("final copy and delete"); + + // We moved all the children, now move the top-level dir + // Empty directory should have been added as the object summary + renameTracker.moveSourceDirectory(); } - if (!keysToDelete.isEmpty()) { - removeKeys(keysToDelete, false, false); + } catch (AmazonClientException | IOException ex) { + // rename failed. + // block for all ongoing copies to complete, successfully or not + try { + completeActiveCopies.apply("failure handling"); + } catch (IOException e) { + // a failure to update the metastore after a rename failure is what + // we'd see on a network problem, expired credentials and other + // unrecoverable errors. + // Downgrading to warn because an exception is already + // about to be thrown. + LOG.warn("While completing all active copies", e); } - // We moved all the children, now move the top-level dir - // Empty directory should have been added as the object summary - if (hasMetadataStore() - && srcPaths != null - && !srcPaths.contains(src)) { - LOG.debug("To move the non-empty top-level dir src={} and dst={}", - src, dst); - S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, src, dst, - username); - } + // notify the rename tracker of the failure + throw renameTracker.renameFailed(ex); } - metadataStore.move(srcPaths, dstMetas); + // At this point the rename has completed in the S3 store. + // Tell the metastore this fact and let it complete its changes + renameTracker.completeRename(); if (!src.getParent().equals(dst.getParent())) { LOG.debug("source & dest parents are different; fix up dir markers"); deleteUnnecessaryFakeDirectories(dst.getParent()); maybeCreateFakeParentDirectory(src); } - return true; + return bytesCopied.get(); + } + + /** + * Remove source objects and update the metastore by way of + * the rename tracker. + * @param renameTracker rename tracker to update. + * @param keysToDelete list of keys to delete + * @param pathsToDelete list of paths matching the keys to delete 1:1. + * @throws IOException failure + */ + @Retries.RetryMixed + private void removeSourceObjects( + final RenameTracker renameTracker, + final List<DeleteObjectsRequest.KeyVersion> keysToDelete, + final List<Path> pathsToDelete) + throws IOException { + List<Path> undeletedObjects = new ArrayList<>(); + try { + // remove the keys + // this does will update the metastore on a failure, but on Review comment: nit: /does will/does/ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org