This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 5c6bd951bd58bb638bd1284baeafb05ba6719cc9 Author: lijinglun <lijing...@bytedance.com> AuthorDate: Wed Oct 16 21:29:03 2024 +0800 Integration of TOS: Add FsOps. 1. Add FsOps, DefaultFsOps, DirectoryFsOps, RenameOp. 2. Add RawFileStatus. --- .../{TosFileSystem.java => RawFileStatus.java} | 30 +- .../org/apache/hadoop/fs/tosfs/RawFileSystem.java | 766 +++++++++++++++++++++ .../org/apache/hadoop/fs/tosfs/conf/ConfKeys.java | 38 + .../apache/hadoop/fs/tosfs/ops/DefaultFsOps.java | 186 +++++ .../apache/hadoop/fs/tosfs/ops/DirectoryFsOps.java | 107 +++ .../java/org/apache/hadoop/fs/tosfs/ops/FsOps.java | 88 +++ .../org/apache/hadoop/fs/tosfs/ops/RenameOp.java | 221 ++++++ 7 files changed, 1435 insertions(+), 1 deletion(-) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileStatus.java similarity index 51% rename from hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java rename to hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileStatus.java index 47c9096096a..99b40b78349 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileStatus.java @@ -18,5 +18,33 @@ package org.apache.hadoop.fs.tosfs; -public class TosFileSystem { +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.object.Constants; + +public class RawFileStatus extends FileStatus { + private final byte[] checksum; + + /** + * File status of directory + * + * @param path directory path + * @param owner directory owner + */ + public RawFileStatus(Path path, String owner) { + this(0, true, 1, System.currentTimeMillis(), path, owner, Constants.MAGIC_CHECKSUM); + } + + public RawFileStatus( + long length, boolean isdir, long blocksize, + long modification_time, Path path, String owner, byte[] checksum) { + super(length, isdir, 1, blocksize, modification_time, path); + setOwner(owner); + setGroup(owner); + this.checksum = checksum; + } + + public byte[] checksum() { + return checksum; + } } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java new file mode 100644 index 00000000000..ce3b85ab340 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java @@ -0,0 +1,766 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs; + +import io.proton.commit.MagicOutputStream; +import io.proton.common.conf.Conf; +import io.proton.common.conf.ConfKeys; +import io.proton.common.object.DirectoryStorage; +import io.proton.common.object.ObjectInfo; +import io.proton.common.object.ObjectMultiRangeInputStream; +import io.proton.common.object.ObjectOutputStream; +import io.proton.common.object.ObjectRangeInputStream; +import io.proton.common.object.ObjectStorage; +import io.proton.common.object.ObjectStorageFactory; +import io.proton.common.object.ObjectUtils; +import io.proton.common.object.exceptions.InvalidObjectKeyException; +import io.proton.common.util.Bytes; +import io.proton.common.util.Constants; +import io.proton.common.util.FSUtils; +import io.proton.common.util.FuseUtils; +import io.proton.common.util.HadoopUtil; +import io.proton.common.util.Range; +import io.proton.common.util.RemoteIterators; +import io.proton.common.util.ThreadPools; +import io.proton.fs.ops.DefaultFsOps; +import io.proton.fs.ops.DirectoryFsOps; +import io.proton.fs.ops.FsOps; +import io.proton.shaded.com.google.common.annotations.VisibleForTesting; +import io.proton.shaded.com.google.common.base.Preconditions; +import io.proton.shaded.com.google.common.collect.Iterators; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.ParentNotDirectoryException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hadoop.fs.XAttrSetFlag.CREATE; +import static org.apache.hadoop.fs.XAttrSetFlag.REPLACE; + +public class RawFileSystem extends FileSystem { + private static final Logger LOG = LoggerFactory.getLogger(RawFileSystem.class); + private static final String MULTIPART_THREAD_POOL_PREFIX = "rawfs-multipart-thread-pool"; + private static final String TASK_THREAD_POOL_PREFIX = "rawfs-task-thread-pool"; + // This is the same as HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, we do not + // use that directly because we don't want to introduce the hdfs client library. + private static final String DFS_BLOCK_SIZE_KEY = "dfs.blocksize"; + private static final long DFS_BLOCK_SIZE_DEFAULT = 128 << 20; + + private String scheme; + private Conf protonConf; + private String username; + private Path workingDir; + private URI uri; + private String bucket; + private ObjectStorage storage; + // Use for task parallel execution, such as parallel to copy multiple files. + private ExecutorService taskThreadPool; + // Use for file multipart upload only. + private ExecutorService uploadThreadPool; + private FsOps fsOps; + + @Override + public URI getUri() { + return uri; + } + + @Override + public String getScheme() { + return scheme; + } + + @VisibleForTesting + String bucket() { + return bucket; + } + + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + LOG.debug("Opening '{}' for reading.", path); + FileStatus status = innerFileStatus(path); + if (status.isDirectory()) { + throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path)); + } + + // Parse the range size from the hadoop conf. + long rangeSize = getConf().getLong( + ConfKeys.OBJECT_STREAM_RANGE_SIZE.key(), + ConfKeys.OBJECT_STREAM_RANGE_SIZE.defaultValue()); + Preconditions.checkArgument(rangeSize > 0, "Object storage range size must be positive."); + + FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool, storage, path, status.getLen(), rangeSize); + return new FSDataInputStream(fsIn); + } + + public FSDataInputStream open(Path path, String expectedChecksum, Range range) throws IOException { + LOG.debug("Opening '{}' for reading.", path); + RawFileStatus status = innerFileStatus(path); + if (status.isDirectory()) { + throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path)); + } + + if (expectedChecksum != null && !Objects.equals(status.checksum(), expectedChecksum)) { + throw new ChecksumMismatchException(String.format("The requested file has been staled, " + + "request version's checksum is %s " + + "while current version's checksum is %s", expectedChecksum, status.checksum())); + } + + return new FSDataInputStream(new ObjectRangeInputStream(storage, path, range)); + } + + @Override + public FSDataOutputStream create( + Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + FileStatus fileStatus = getFileStatusOrNull(path); + if (fileStatus != null) { + if (fileStatus.isDirectory()) { + throw new FileAlreadyExistsException(path + " is a directory"); + } + + if (!overwrite) { + throw new FileAlreadyExistsException(path + " already exists"); + } + LOG.debug("Overwriting file {}", path); + } + + if (MagicOutputStream.isMagic(path)) { + return new FSDataOutputStream( + new MagicOutputStream(this, storage, uploadThreadPool, protonConf, makeQualified(path)), null); + } else { + ObjectOutputStream out = + new ObjectOutputStream(storage, uploadThreadPool, protonConf, makeQualified(path), true); + + if (fileStatus == null && FuseUtils.fuseEnabled()) { + // The fuse requires the file to be visible when accessing getFileStatus once we created the file, so here we + // close and commit the file to be visible explicitly for fuse, and then reopen the file output stream for + // further data bytes writing. For more details please see: https://code.byted.org/emr/proton/issues/825 + out.close(); + out = new ObjectOutputStream(storage, uploadThreadPool, protonConf, makeQualified(path), true); + } + + return new FSDataOutputStream(out, null); + } + } + + @Override + public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException { + Path path = makeQualified(f); + LOG.debug("listFiles({}, {})", path, recursive); + + // assume the path is a dir at first, and list sub files + RemoteIterator<LocatedFileStatus> subFiles = RemoteIterators.fromIterable( + fsOps.listDir(path, recursive, key -> !ObjectInfo.isDir(key)), this::toLocatedFileStatus); + if (!subFiles.hasNext()) { + final RawFileStatus fileStatus = innerFileStatus(path); + if (fileStatus.isFile()) { + return RemoteIterators.fromSingleton(toLocatedFileStatus(fileStatus)); + } + } + return subFiles; + } + + private RawLocatedFileStatus toLocatedFileStatus(RawFileStatus status) throws IOException { + return new RawLocatedFileStatus(status, + status.isFile() ? getFileBlockLocations(status, 0, status.getLen()) : null); + } + + @Override + public FSDataOutputStream createNonRecursive( + Path path, + FsPermission permission, + EnumSet<CreateFlag> flag, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + Path qualified = makeQualified(path); + return create(qualified, permission, flag.contains(CreateFlag.OVERWRITE), + bufferSize, replication, blockSize, progress); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + throw new IOException("Not supported"); + } + + /** + * Rename src path to dest path, if dest path is an existed dir, + * then FS will rename the src path under the dst dir. + * E.g. rename('/a/b', '/a/c') and dest 'c' is an existed dir, + * then the source path '/a/b' will be renamed with dest path '/a/b/c' internally. + * + * <ul> + * <li>Return false if src doesn't exist</li> + * <li>Return false if src is root</li> + * <li>Return false if dst path is under src path, e.g. rename('/a/b', '/a/b/c')</li> + * <li>Return false if dst path already exists</li> + * <li>Return true if rename('/a/b', '/a/b') and 'b' is an existed file</li> + * <li>Return true if rename('/a/b', '/a') and 'a' is an existed dir, + * fs will rename '/a/b' to '/a/b' internally</li> + * <li>Return false if rename('/a/b', '/a/b') and 'b' is an existed dir, + * because fs will try to rename '/a/b' to '/a/b/b', which is under '/a/b', this behavior is forbidden.</li> + * </ul> + * + * @param src path to be renamed + * @param dst path after rename + * @return true if rename is successful + * @throws IOException on failure + */ + @Override + public boolean rename(Path src, Path dst) throws IOException { + LOG.debug("Rename source path {} to dest path {}", src, dst); + + // 1. Check source and destination path + Future<FileStatus> srcStatusFuture = taskThreadPool.submit(() -> checkAndGetSrcStatus(src)); + Future<Path> destPathFuture = taskThreadPool.submit(() -> checkAndGetDstPath(src, dst)); + + FileStatus srcStatus; + Path dstPath; + try { + srcStatus = srcStatusFuture.get(); + dstPath = destPathFuture.get(); + + if (src.equals(dstPath)) { + return true; + } + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to rename path, src: {}, dst: {}", src, dst, e); + return false; + } + + // 2. Start copy source to destination + if (srcStatus.isDirectory()) { + fsOps.renameDir(srcStatus.getPath(), dstPath); + } else { + fsOps.renameFile(srcStatus.getPath(), dstPath, srcStatus.getLen()); + } + + return true; + } + + private Path checkAndGetDstPath(Path src, Path dest) throws IOException { + FileStatus destStatus = getFileStatusOrNull(dest); + // 1. Rebuilding the destination path + Path finalDstPath = dest; + if (destStatus != null && destStatus.isDirectory()) { + finalDstPath = new Path(dest, src.getName()); + } + + // 2. No need to check the dest path because renaming itself is allowed. + if (src.equals(finalDstPath)) { + return finalDstPath; + } + + // 3. Ensure the source path cannot be the ancestor of destination path. + if (RawFSUtils.inSubtree(src, finalDstPath)) { + throw new IOException(String.format("Failed to rename since it is prohibited to " + + "rename dest path %s under src path %s", finalDstPath, src)); + } + + // 4. Ensure the destination path doesn't exist. + FileStatus finalDstStatus = destStatus; + if (destStatus != null && destStatus.isDirectory()) { + finalDstStatus = getFileStatusOrNull(finalDstPath); + } + if (finalDstStatus != null) { + throw new FileAlreadyExistsException( + String.format("Failed to rename since the dest path %s already exists.", finalDstPath)); + } else { + return finalDstPath; + } + } + + private FileStatus checkAndGetSrcStatus(Path src) throws IOException { + // throw FileNotFoundException if src not found. + FileStatus srcStatus = innerFileStatus(src); + + if (src.isRoot()) { + throw new IOException(String.format("Cannot rename the root directory %s to another name", src)); + } + return srcStatus; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + LOG.debug("Delete path {} - recursive {}", f, recursive); + try { + FileStatus fileStatus = getFileStatus(f); + Path path = fileStatus.getPath(); + + if (path.isRoot()) { + return deleteRoot(path, recursive); + } else { + if (fileStatus.isDirectory()) { + fsOps.deleteDir(path, recursive); + } else { + fsOps.deleteFile(path); + } + return true; + } + } catch (FileNotFoundException e) { + LOG.debug("Couldn't delete {} - does not exist", f); + return false; + } + } + + /** + * Reject deleting root directory and implement the specific logic to compatible with + * AbstractContractRootDirectoryTest rm test cases. + * + * @param root the root path. + * @param recursive indicate whether delete directory recursively + * @return true if root directory is empty, false if trying to delete a non-empty dir recursively. + * @throws IOException if trying to delete the non-empty root dir non-recursively. + */ + private boolean deleteRoot(Path root, boolean recursive) throws IOException { + LOG.info("Delete the {} root directory of {}", bucket, recursive); + boolean isEmptyDir = fsOps.isEmptyDirectory(root); + if (isEmptyDir) { + return true; + } + if (recursive) { + // AbstractContractRootDirectoryTest#testRmRootRecursive doesn't expect any exception if trying to delete a + // non-empty root directory recursively, so we have to return false here instead of throwing a IOException. + return false; + } else { + // AbstractContractRootDirectoryTest#testRmNonEmptyRootDirNonRecursive expect a exception if trying to delete a + // non-empty root directory non-recursively, so we have to throw a IOException instead of returning false. + throw new PathIOException(bucket, "Cannot delete root path"); + } + } + + @Override + public RawFileStatus[] listStatus(Path f) throws IOException { + LOG.debug("List status for path: {}", f); + return Iterators.toArray(listStatus(f, false), RawFileStatus.class); + } + + public Iterator<RawFileStatus> listStatus(Path f, boolean recursive) throws IOException { + Path path = makeQualified(f); + // Assuming path is a dir at first. + Iterator<RawFileStatus> iterator = fsOps.listDir(path, recursive, key -> true).iterator(); + if (iterator.hasNext()) { + return iterator; + } else { + RawFileStatus fileStatus = innerFileStatus(path); + if (fileStatus.isFile()) { + return Collections.singletonList(fileStatus).iterator(); + } else { + // The path is an empty dir. + return Collections.emptyIterator(); + } + } + } + + @Override + public RemoteIterator<FileStatus> listStatusIterator(Path p) throws IOException { + // We expect throw FileNotFoundException if the path doesn't exist during creating the RemoteIterator instead of + // throwing FileNotFoundException during call hasNext method. + + // The follow RemoteIterator is as same as {@link FileSystem#DirListingIterator} above hadoop 3.2.2, + // but below 3.2.2, the DirListingIterator fetches the directory entries during call hasNext method instead of + // create the DirListingIterator instance. + return new RemoteIterator<FileStatus>() { + private DirectoryEntries entries = listStatusBatch(p, null); + private int index = 0; + + @Override + public boolean hasNext() { + return index < entries.getEntries().length || entries.hasMore(); + } + + private void fetchMore() throws IOException { + byte[] token = entries.getToken(); + entries = listStatusBatch(p, token); + index = 0; + } + + @Override + public FileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException("No more items in iterator"); + } else { + if (index == entries.getEntries().length) { + fetchMore(); + if (!hasNext()) { + throw new NoSuchElementException("No more items in iterator"); + } + } + + return entries.getEntries()[index++]; + } + } + }; + } + + public static long dateToLong(final Date date) { + return date == null ? 0L : date.getTime(); + } + + @Override + public Path getWorkingDirectory() { + return workingDir; + } + + @Override + public void setWorkingDirectory(Path new_dir) { + this.workingDir = new_dir; + } + + @Override + public boolean mkdirs(Path path, FsPermission permission) throws IOException { + try { + FileStatus fileStatus = innerFileStatus(path); + if (fileStatus.isDirectory()) { + return true; + } else { + throw new FileAlreadyExistsException("Path is a file: " + path); + } + } catch (FileNotFoundException e) { + Path dir = makeQualified(path); + validatePath(dir); + fsOps.mkdirs(dir); + } + return true; + } + + private void validatePath(Path path) throws IOException { + Path parent = path.getParent(); + do { + try { + FileStatus fileStatus = innerFileStatus(parent); + if (fileStatus.isDirectory()) { + // If path exists and a directory, exit + break; + } else { + throw new FileAlreadyExistsException(String.format("Can't make directory for path '%s', it is a file.", + parent)); + } + } catch (FileNotFoundException ignored) { + } + parent = parent.getParent(); + } while (parent != null); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + try { + return innerFileStatus(path); + } catch (ParentNotDirectoryException e) { + // Treat ParentNotDirectoryException as FileNotFoundException for the case that check whether path exist or not. + throw new FileNotFoundException(e.getMessage()); + } + } + + + /** + * Get the file status of given path. + * + * @param f the path + * @return {@link RawFileStatus} describe file status info. + * @throws FileNotFoundException if the path doesn't exist. + * @throws ParentNotDirectoryException if the path is locating under an existing file, which is not allowed + * in directory bucket case. + */ + RawFileStatus innerFileStatus(Path f) throws ParentNotDirectoryException, FileNotFoundException { + Path qualifiedPath = f.makeQualified(uri, workingDir); + RawFileStatus fileStatus = getFileStatusOrNull(qualifiedPath); + if (fileStatus == null) { + throw new FileNotFoundException(String.format("No such file or directory: %s", qualifiedPath)); + } + return fileStatus; + } + + /** + * The different with {@link RawFileSystem#getFileStatus(Path)} is that: + * 1. throw {@link ParentNotDirectoryException} if the path is locating under an existing file in directory bucket + * case, but {@link RawFileSystem#getFileStatus(Path)} will ignore whether the invalid path and + * throw {@link FileNotFoundException} + * 2. return null if the path doesn't exist instead of throwing {@link FileNotFoundException}. + * + * @param path the object path. + * @return null if the path doesn't exist. + * @throws ParentNotDirectoryException if the path is locating under an existing file, which is not allowed + * in directory bucket case. + */ + public RawFileStatus getFileStatusOrNull(final Path path) throws ParentNotDirectoryException { + Path qualifiedPath = path.makeQualified(uri, workingDir); + String key = ObjectUtils.pathToKey(qualifiedPath); + + // Root directory always exists + if (key.isEmpty()) { + return new RawFileStatus(0, true, 0, 0, qualifiedPath, username, Constants.EMPTY_CHECKSUM); + } + + try { + ObjectInfo obj = storage.objectStatus(key); + if (obj == null) { + return null; + } else { + return objectToFileStatus(obj); + } + } catch (InvalidObjectKeyException e) { + String msg = String.format("The object key %s is a invalid key, detail: %s", key, e.getMessage()); + throw new ParentNotDirectoryException(msg); + } + } + + private RawFileStatus objectToFileStatus(ObjectInfo obj) { + Path keyPath = makeQualified(ObjectUtils.keyToPath(obj.key())); + long blockSize = obj.isDir() ? 0 : getDefaultBlockSize(keyPath); + long modificationTime = dateToLong(obj.mtime()); + return new RawFileStatus(obj.size(), obj.isDir(), blockSize, modificationTime, keyPath, username, obj.checksum()); + } + + @Override + @Deprecated + public long getDefaultBlockSize() { + return getConf().getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); + } + + @Override + public FsServerDefaults getServerDefaults(Path p) throws IOException { + Configuration config = getConf(); + // CRC32 is chosen as default as it is available in all + // releases that support checksum. + // The client trash configuration is ignored. + return new FsServerDefaults(getDefaultBlockSize(), + config.getInt("dfs.bytes-per-checksum", 512), + 64 * 1024, + getDefaultReplication(), + config.getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, + CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT), + false, + CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT, + DataChecksum.Type.CRC32, + ""); + } + + private void stopAllServices() { + HadoopUtil.shutdownHadoopExecutors(uploadThreadPool, LOG, 30, TimeUnit.SECONDS); + HadoopUtil.shutdownHadoopExecutors(taskThreadPool, LOG, 30, TimeUnit.SECONDS); + } + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + setConf(conf); + this.scheme = FSUtils.scheme(conf, uri); + + // Merge the deprecated configure keys with the new configure keys and convert hadoop conf to proton conf. + FSUtils.withCompatibleKeys(conf, scheme); + this.protonConf = Conf.copyOf(conf); + + // Username is the current user at the time the FS was instantiated. + this.username = UserGroupInformation.getCurrentUser().getShortUserName(); + this.workingDir = new Path("/user", username).makeQualified(uri, null); + this.uri = URI.create(scheme + "://" + uri.getAuthority()); + this.bucket = this.uri.getAuthority(); + this.storage = ObjectStorageFactory.create(scheme, bucket, protonConf); + if (storage.bucket() == null) { + throw new FileNotFoundException(String.format("Bucket: %s not found.", uri.getAuthority())); + } + + int taskThreadPoolSize = protonConf.get(ConfKeys.TASK_THREAD_POOL_SIZE.format(scheme)); + this.taskThreadPool = ThreadPools.newWorkerPool(TASK_THREAD_POOL_PREFIX, taskThreadPoolSize); + + int uploadThreadPoolSize = protonConf.get(ConfKeys.MULTIPART_THREAD_POOL_SIZE.format(scheme)); + this.uploadThreadPool = ThreadPools.newWorkerPool(MULTIPART_THREAD_POOL_PREFIX, uploadThreadPoolSize); + + if (storage.bucket().isDirectory()) { + + fsOps = new DirectoryFsOps((DirectoryStorage) storage, this::objectToFileStatus); + } else { + fsOps = new DefaultFsOps(storage, protonConf, taskThreadPool, this::objectToFileStatus); + } + } + + @Override + public void close() throws IOException { + try { + super.close(); + storage.close(); + } finally { + stopAllServices(); + } + } + + public ObjectStorage storage() { + return storage; + } + + public ExecutorService uploadThreadPool() { + return uploadThreadPool; + } + + String username() { + return username; + } + + /** + * @return null if checksum is not supported. + */ + @Override + public FileChecksum getFileChecksum(Path f, long length) throws IOException { + Preconditions.checkArgument(length >= 0); + RawFileStatus fileStatus = innerFileStatus(f); + if (fileStatus.isDirectory()) { + // Compatible with HDFS + throw new FileNotFoundException(String.format("Path is not a file, %s", f)); + } + if (!protonConf.get(ConfKeys.CHECKSUM_ENABLED.format(scheme))) { + return null; + } + + return BaseChecksum.create(protonConf, fileStatus, length); + } + + @Override + public String getCanonicalServiceName() { + return null; + } + + @Override + public void setXAttr(Path path, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException { + Preconditions.checkNotNull(name, "xAttr name must not be null."); + Preconditions.checkArgument(!name.isEmpty(), "xAttr name must not be empty."); + Preconditions.checkNotNull(value, "xAttr value must not be null."); + + if (getFileStatus(path).isFile()) { + Path qualifiedPath = path.makeQualified(uri, workingDir); + String key = ObjectUtils.pathToKey(qualifiedPath); + + Map<String, String> existedTags = storage.getTags(key); + validateXAttrFlag(name, existedTags.containsKey(name), flag); + + String newValue = Bytes.toString(value); + String previousValue = existedTags.put(name, newValue); + if (!newValue.equals(previousValue)) { + storage.putTags(key, existedTags); + } + } + } + + @Override + public Map<String, byte[]> getXAttrs(Path path) throws IOException { + if (getFileStatus(path).isDirectory()) { + return new HashMap<>(); + } else { + Path qualifiedPath = path.makeQualified(uri, workingDir); + String key = ObjectUtils.pathToKey(qualifiedPath); + + Map<String, String> tags = storage.getTags(key); + return tags.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> Bytes.toBytes(t.getValue()))); + } + } + + @Override + public byte[] getXAttr(Path path, String name) throws IOException { + Map<String, byte[]> xAttrs = getXAttrs(path); + if (xAttrs.containsKey(name)) { + return xAttrs.get(name); + } else { + throw new IOException("Attribute with name " + name + " is not found."); + } + } + + @Override + public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException { + Map<String, byte[]> xAttrs = getXAttrs(path); + xAttrs.keySet().retainAll(names); + if (xAttrs.size() == names.size()) { + return xAttrs; + } else { + List<String> badNames = names.stream().filter(n -> !xAttrs.containsKey(n)).collect(Collectors.toList()); + throw new IOException("Attributes with name " + badNames + " are not found."); + } + } + + @Override + public List<String> listXAttrs(Path path) throws IOException { + return getXAttrs(path).keySet().stream().collect(Collectors.toList()); + } + + @Override + public void removeXAttr(Path path, String name) throws IOException { + if (getFileStatus(path).isFile()) { + Path qualifiedPath = path.makeQualified(uri, workingDir); + String key = ObjectUtils.pathToKey(qualifiedPath); + + Map<String, String> existedTags = storage.getTags(key); + if (existedTags.remove(name) != null) { + storage.putTags(key, existedTags); + } + } + } + + private void validateXAttrFlag(String xAttrName, boolean xAttrExists, EnumSet<XAttrSetFlag> flag) throws IOException { + if (xAttrExists) { + if (!flag.contains(REPLACE)) { + throw new IOException("XAttr: " + xAttrName + " already exists. The REPLACE flag must be specified."); + } + } else { + if (!flag.contains(CREATE)) { + throw new IOException("XAttr: " + xAttrName + " does not exist. The CREATE flag must be specified."); + } + } + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java index a059baaf11e..8534fe5ef74 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java @@ -46,4 +46,42 @@ public class ConfKeys { */ public static final ArgumentKey FS_BATCH_DELETE_SIZE = new ArgumentKey("fs.%s.delete.batch-size"); public static final int FS_BATCH_DELETE_SIZE_DEFAULT = 250; + + /** + * The multipart upload part size of the given object storage, e.g. fs.tos.multipart.size. + */ + public static final String MULTIPART_SIZE = "fs.tos.multipart.size"; + public static final long MULTIPART_SIZE_DEFAULT = 8L << 20; + + /** + * The threshold (larger than this value) to enable multipart upload during copying objects + * in the given object storage. If the copied data size is less than threshold, will copy data via + * executing copyObject instead of uploadPartCopy. E.g. fs.tos.multipart.copy-threshold + */ + public static final String MULTIPART_COPY_THRESHOLD = "fs.tos.multipart.copy-threshold"; + public static final long MULTIPART_COPY_THRESHOLD_DEFAULT = 5L << 20; + + /** + * The batch size of deleting multiple objects per request for the given object storage. + * e.g. fs.tos.delete.batch-size + */ + public static final String BATCH_DELETE_SIZE = "fs.tos.delete.batch-size"; + public static final int BATCH_DELETE_SIZE_DEFAULT = 250; + + /** + * True to create the missed parent dir asynchronously during deleting or renaming a file or dir. + */ + public static final String ASYNC_CREATE_MISSED_PARENT = "fs.tos.missed.parent.dir.async-create"; + public static final boolean ASYNC_CREATE_MISSED_PARENT_DEFAULT = true; + + /** + * Whether using rename semantic of object storage during rename files, otherwise using + * copy + delete. + * Please ensure that the object storage support and enable rename semantic and before enable it, + * and also ensure grant rename permission to the requester. + * If you are using TOS, you have to send putBucketRename request before sending rename request, + * otherwise MethodNotAllowed exception will be thrown. + */ + public static final String OBJECT_RENAME_ENABLED = "fs.tos.rename.enabled"; + public static final boolean OBJECT_RENAME_ENABLED_DEFAULT = false; } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DefaultFsOps.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DefaultFsOps.java new file mode 100644 index 00000000000..2042f1b986c --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DefaultFsOps.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.ops; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.tosfs.RawFileStatus; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest; +import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.thirdparty.com.google.common.base.Function; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Predicate; + +import static org.apache.hadoop.fs.tosfs.object.ObjectUtils.SLASH; + +/** + * Provides rename, delete, list capabilities for general purpose bucket. + */ +public class DefaultFsOps implements FsOps { + private final ObjectStorage storage; + private final ExecutorService taskThreadPool; + private final Function<ObjectInfo, RawFileStatus> objMapper; + private final RenameOp renameOp; + private final boolean asyncCreateParentDir; + + public DefaultFsOps( + ObjectStorage storage, + Configuration conf, + ExecutorService taskThreadPool, + Function<ObjectInfo, RawFileStatus> objMapper) { + this.storage = storage; + this.taskThreadPool = taskThreadPool; + this.objMapper = objMapper; + this.renameOp = new RenameOp(conf, storage, taskThreadPool); + this.asyncCreateParentDir = conf.getBoolean(ConfKeys.ASYNC_CREATE_MISSED_PARENT, + ConfKeys.ASYNC_CREATE_MISSED_PARENT_DEFAULT); + } + + @Override + public void renameFile(Path src, Path dst, long length) { + renameOp.renameFile(src, dst, length); + mkdirIfNecessary(src.getParent(), asyncCreateParentDir); + } + + @Override + public void renameDir(Path src, Path dst) { + renameOp.renameDir(src, dst); + mkdirIfNecessary(src.getParent(), asyncCreateParentDir); + } + + @Override + public void deleteFile(Path file) { + storage.delete(ObjectUtils.pathToKey(file)); + mkdirIfNecessary(file.getParent(), asyncCreateParentDir); + } + + @Override + public void deleteDir(Path dir, boolean recursive) throws IOException { + String dirKey = ObjectUtils.pathToKey(dir, true); + if (recursive) { + storage.deleteAll(dirKey); + } else { + if (isEmptyDirectory(dir)) { + storage.delete(dirKey); + } else { + throw new PathIsNotEmptyDirectoryException(dir.toString()); + } + } + } + + @Override + public Iterable<RawFileStatus> listDir(Path dir, boolean recursive, Predicate<String> postFilter) { + String key = ObjectUtils.pathToKey(dir, true); + String delimiter = recursive ? null : SLASH; + + ListObjectsRequest req = ListObjectsRequest.builder() + .prefix(key) + .startAfter(key) + .delimiter(delimiter) + .build(); + return Iterables.transform(asObjectInfo(storage.list(req), postFilter), objMapper); + } + + @Override + public boolean isEmptyDirectory(Path dir) { + String key = ObjectUtils.pathToKey(dir, true); + ListObjectsRequest req = ListObjectsRequest.builder() + .prefix(key) + .startAfter(key) + .delimiter(SLASH) + .maxKeys(1) + .build(); + return !asObjectInfo(storage.list(req), s -> true).iterator().hasNext(); + } + + @Override + public void mkdirs(Path dir) { + if (dir.isRoot()) { + return; + } + String key = ObjectUtils.pathToKey(dir, true); + storage.put(key, new byte[0]); + + // Create parent dir if missed. + Path parentPath = dir.getParent(); + String parentKey = ObjectUtils.pathToKey(parentPath, true); + while (!parentPath.isRoot() && storage.head(parentKey) == null) { + storage.put(parentKey, new byte[0]); + parentPath = parentPath.getParent(); + parentKey = ObjectUtils.pathToKey(parentPath, true); + } + } + + private void mkdirIfNecessary(Path path, boolean async) { + if (path != null) { + CommonUtils.runQuietly(() -> { + Future<?> future = taskThreadPool.submit(() -> { + String key = ObjectUtils.pathToKey(path, true); + if (!key.isEmpty() && storage.head(key) == null) { + mkdirs(ObjectUtils.keyToPath(key)); + } + }); + + if (!async) { + future.get(); + } + }); + } + } + + /** + * Convert ListObjectResponse iterable to FileStatus iterable, + * using file status acceptor to filter the expected objects and common prefixes. + * + * @param listResponses the iterable of ListObjectsResponse + * @param filter the file status acceptor + * @return the iterable of TosFileStatus + */ + private Iterable<ObjectInfo> asObjectInfo(Iterable<ListObjectsResponse> listResponses, Predicate<String> filter) { + Iterable<List<ObjectInfo>> results = Iterables.transform(listResponses, listResp -> { + List<ObjectInfo> objs = Lists.newArrayList(); + + // Add object files. + objs.addAll(listResp.objects()); + + // Add object directories. + for (String prefix : listResp.commonPrefixes()) { + objs.add(new ObjectInfo(prefix, 0, new Date(), Constants.MAGIC_CHECKSUM, true)); + } + + return objs; + }); + + return Iterables.filter(Iterables.concat(results), o -> filter.test(o.key())); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DirectoryFsOps.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DirectoryFsOps.java new file mode 100644 index 00000000000..3f1d829b03e --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DirectoryFsOps.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.ops; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.tosfs.RawFileStatus; +import org.apache.hadoop.fs.tosfs.object.DirectoryStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.thirdparty.com.google.common.base.Function; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; + +import java.io.IOException; +import java.util.function.Predicate; + +/** + * Provides rename, delete, list capabilities for directory bucket. + */ +public class DirectoryFsOps implements FsOps { + private final DirectoryStorage storage; + private final Function<ObjectInfo, RawFileStatus> objMapper; + + public DirectoryFsOps(DirectoryStorage storage, Function<ObjectInfo, RawFileStatus> objMapper) { + this.storage = storage; + this.objMapper = objMapper; + } + + @Override + public void renameFile(Path src, Path dst, long length) { + innerRename(src, dst, false); + } + + @Override + public void renameDir(Path src, Path dst) { + innerRename(src, dst, true); + } + + private void innerRename(Path src, Path dst, boolean isDir) { + // Need to ensure the dest parent exist before rename file in directory bucket. + String dstParentKey = ObjectUtils.pathToKey(dst.getParent(), true); + if (!dstParentKey.isEmpty() && storage.head(dstParentKey) == null) { + mkdirs(dst.getParent()); + } + + String srcKey = ObjectUtils.pathToKey(src, isDir); + String dstKey = ObjectUtils.pathToKey(dst, isDir); + storage.rename(srcKey, dstKey); + } + + @Override + public void deleteFile(Path file) { + storage.delete(ObjectUtils.pathToKey(file)); + } + + @Override + public void deleteDir(Path dir, boolean recursive) throws IOException { + String dirKey = ObjectUtils.pathToKey(dir, true); + if (recursive) { + storage.deleteAll(dirKey); + } else { + if (isEmptyDirectory(dir)) { + storage.delete(dirKey); + } else { + throw new PathIsNotEmptyDirectoryException(dir.toString()); + } + } + } + + @Override + public Iterable<RawFileStatus> listDir(Path dir, boolean recursive, Predicate<String> postFilter) { + String key = ObjectUtils.pathToKey(dir, true); + Iterable<ObjectInfo> objs = Iterables.filter(storage.listDir(key, recursive), obj -> postFilter.test(obj.key())); + return Iterables.transform(objs, objMapper); + } + + @Override + public boolean isEmptyDirectory(Path dir) { + return storage.isEmptyDir(ObjectUtils.pathToKey(dir, true)); + } + + @Override + public void mkdirs(Path dir) { + if (dir.isRoot()) { + return; + } + String key = ObjectUtils.pathToKey(dir, true); + // Directory bucket will create missed parent dirs automatically. + storage.put(key, new byte[0]); + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/FsOps.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/FsOps.java new file mode 100644 index 00000000000..7eca5d2240f --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/FsOps.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.ops; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.RawFileStatus; + +import java.io.IOException; +import java.util.function.Predicate; + +public interface FsOps { + + /** + * Rename file from source path to dest path. + * + * @param src the source path. + * @param dst the dest path. + * @param length the length of source file. + * @throws IOException if any io error happen. + */ + void renameFile(Path src, Path dst, long length) throws IOException; + + /** + * Rename dir from source path to dest path. + * + * @param src the source path. + * @param dst the dest path. + * @throws IOException if any io error happen. + */ + void renameDir(Path src, Path dst) throws IOException; + + /** + * Delete the given file. + * + * @param file the given file path. + * @throws IOException if any io error happen. + */ + void deleteFile(Path file) throws IOException; + + /** + * Delete the given dir. + * + * @param dir the given dir path. + * @param recursive indicate whether delete dir recursively. + * @throws IOException if any io error happen. + */ + void deleteDir(Path dir, boolean recursive) throws IOException; + + /** + * List the sub dirs and files with given dir. + * Return empty collection if the path doesn't exist, or is a file, or is an empty dir. + * + * @param dir the listed path. + * @param recursive indicated whether list all sub dirs/files or not. + * @param postFilter filter the result after getting listing response. + * @return the status of sub dirs and files. + */ + Iterable<RawFileStatus> listDir(Path dir, boolean recursive, Predicate<String> postFilter); + + /** + * @return true if path don't have any children. + */ + boolean isEmptyDirectory(Path dir); + + /** + * Create dir and parent dirs if don't exist. + * + * @param dir the dir to be created. + * @throws IOException if any io error happen. + */ + void mkdirs(Path dir) throws IOException; +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/RenameOp.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/RenameOp.java new file mode 100644 index 00000000000..332ba24885e --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/RenameOp.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.ops; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.tosfs.common.Tasks; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.object.MultipartUpload; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class RenameOp { + private static final Logger LOG = LoggerFactory.getLogger(RenameOp.class); + private static final int RENAME_RETRY_TIMES = 3; + + private final Configuration conf; + private final ObjectStorage storage; + private final ExecutorService renamePool; + // Whether enable object storage atomic rename object capability. + private final boolean renameObjectEnabled; + + public RenameOp(Configuration conf, ObjectStorage storage, ExecutorService taskThreadPool) { + this.conf = conf; + this.storage = storage; + this.renamePool = taskThreadPool; + this.renameObjectEnabled = + conf.getBoolean(ConfKeys.OBJECT_RENAME_ENABLED, ConfKeys.OBJECT_RENAME_ENABLED_DEFAULT); + } + + public void renameDir(Path src, Path dst) { + String srcKey = ObjectUtils.pathToKey(src, true); + String dstKey = ObjectUtils.pathToKey(dst, true); + renameDir(srcKey, dstKey); + } + + public void renameFile(Path src, Path dst, long length) { + String srcKey = ObjectUtils.pathToKey(src, false); + String dstKey = ObjectUtils.pathToKey(dst, false); + renameFile(srcKey, dstKey, length); + } + + /** + * Renames each object after listing all objects with given src key via renaming semantic if object storage + * supports atomic rename semantic, otherwise renaming all objects via copy & delete. + * + * @param srcKey the source dir key, ending with slash. + * @param dstKey the destination parent dir key, ending with slash. + */ + private void renameDir(String srcKey, String dstKey) { + Iterable<ObjectInfo> objs = storage.listAll(srcKey, ""); + if (renameObjectEnabled) { + Tasks.foreach(objs) + .executeWith(renamePool) + .throwFailureWhenFinished() + .retry(RENAME_RETRY_TIMES) + .revertWith(sourceInfo -> { + String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length()); + String newSrcKey = sourceInfo.key(); + LOG.debug("Try to rollback dest key {} to source key {}", newDstKey, newSrcKey); + + storage.rename(newDstKey, newSrcKey); + }) + .run(sourceInfo -> { + String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length()); + String newSrcKey = sourceInfo.key(); + LOG.debug("Try to rename src key {} to dest key {}", newSrcKey, newDstKey); + + storage.rename(newSrcKey, newDstKey); + }); + } else { + Tasks.foreach(objs) + .executeWith(renamePool) + .throwFailureWhenFinished() + .retry(RENAME_RETRY_TIMES) + .revertWith(sourceInfo -> { + String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length()); + storage.delete(newDstKey); + }) + .run(sourceInfo -> { + String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length()); + LOG.debug("Try to rename src key {} to dest key {}", sourceInfo.key(), newDstKey); + + try { + if (ObjectInfo.isDir(newDstKey)) { + mkdir(newDstKey); + } else { + copyFile(sourceInfo.key(), newDstKey, sourceInfo.size()); + } + } catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to copy source file %s to dest file %s", + sourceInfo.key(), newDstKey), e); + } + }); + + // Delete all the source keys, since we've already copied them into destination keys. + storage.deleteAll(srcKey); + } + } + + private void renameFile(String srcKey, String dstKey, long fileSize) { + if (renameObjectEnabled) { + storage.rename(srcKey, dstKey); + } else { + Tasks.foreach(0) + .throwFailureWhenFinished() + .retry(RENAME_RETRY_TIMES) + .revertWith(obj -> storage.delete(dstKey)) + .run(obj -> { + try { + copyFile(srcKey, dstKey, fileSize); + } catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to copy source file %s to dest file %s", + srcKey, dstKey), e); + } + }); + + Tasks.foreach(0) + .throwFailureWhenFinished() + .retry(RENAME_RETRY_TIMES) + .run(obj -> storage.delete(srcKey)); + } + } + + private void copyFile(String srcKey, String dstKey, long srcSize) throws IOException { + long byteSizePerPart = conf.getLong(ConfKeys.MULTIPART_SIZE, ConfKeys.MULTIPART_SIZE_DEFAULT); + long multiPartCopyThreshold = + conf.getLong(ConfKeys.MULTIPART_COPY_THRESHOLD, ConfKeys.MULTIPART_COPY_THRESHOLD_DEFAULT); + if (srcSize > multiPartCopyThreshold) { + uploadPartCopy(srcKey, srcSize, dstKey, byteSizePerPart); + } else { + storage.copy(srcKey, dstKey); + } + } + + private void uploadPartCopy(String srcKey, long srcSize, String dstKey, long byteSizePerPart) { + final MultipartUpload multipartUpload = storage.createMultipartUpload(dstKey); + try { + Preconditions.checkState(byteSizePerPart >= multipartUpload.minPartSize(), + "Configured upload part size %s must be greater than or equals to the minimal part size %s," + + " please check configure key %s.", + byteSizePerPart, multipartUpload.minPartSize(), ConfKeys.MULTIPART_SIZE.format(storage.scheme())); + + AtomicInteger partNumGetter = new AtomicInteger(0); + List<CompletableFuture<Part>> results = Lists.newArrayList(); + for (long start = 0, end; start < srcSize; start += byteSizePerPart) { + end = Math.min(start + byteSizePerPart, srcSize) - 1; + Preconditions.checkArgument(end >= 0, "Invalid copy range start: %s, end: %s", start, end); + // Submit upload part copy task to the thread pool. + CompletableFuture<Part> result = asyncUploadPartCopy(srcKey, multipartUpload, + partNumGetter.incrementAndGet(), start, end); + results.add(result); + } + + // Waiting for all the upload parts to be finished. + List<Part> parts = results.stream() + .map(CompletableFuture::join) + .sorted(Comparator.comparing(Part::num)) + .collect(Collectors.toList()); + + finishUpload(multipartUpload.key(), multipartUpload.uploadId(), parts); + } catch (Exception e) { + LOG.error("Encountering error when upload part copy", e); + CommonUtils.runQuietly(() -> storage.abortMultipartUpload(multipartUpload.key(), multipartUpload.uploadId())); + throw e; + } + } + + protected void finishUpload(String key, String uploadId, List<Part> uploadParts) { + storage.completeUpload(key, uploadId, uploadParts); + } + + private CompletableFuture<Part> asyncUploadPartCopy( + String srcKey, MultipartUpload multipartUpload, int partNum, + long copyRangeStart, long copyRangeEnd) { + return CompletableFuture.supplyAsync(() -> storage.uploadPartCopy(srcKey, multipartUpload.key(), + multipartUpload.uploadId(), partNum, copyRangeStart, copyRangeEnd), renamePool) + .whenComplete((part, err) -> { + if (err != null) { + LOG.error("Failed to upload part copy, src key: {}, multipartUpload: {}, partNum: {}, copy range start: " + + "{}, copy range end: {}", srcKey, multipartUpload, partNum, copyRangeStart, copyRangeEnd, err); + } + }); + } + + private void mkdir(String key) { + storage.put(key, new byte[0]); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org