Repository: hadoop Updated Branches: refs/heads/trunk ef8edab93 -> 5e7cfdca7
HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5e7cfdca Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5e7cfdca Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5e7cfdca Branch: refs/heads/trunk Commit: 5e7cfdca7b73a88bf3c3f1e5eb794a24218cce52 Parents: ef8edab Author: Lei Xu <[email protected]> Authored: Wed Jun 14 23:17:53 2017 -0700 Committer: Lei Xu <[email protected]> Committed: Thu Jun 15 10:59:24 2017 -0700 ---------------------------------------------------------------------- .../hadoop/fs/FSDataOutputStreamBuilder.java | 171 +++++++++++++----- .../java/org/apache/hadoop/fs/FileSystem.java | 31 +++- .../org/apache/hadoop/fs/FilterFileSystem.java | 4 +- .../org/apache/hadoop/fs/HarFileSystem.java | 4 +- .../apache/hadoop/fs/TestLocalFileSystem.java | 10 +- .../hadoop/hdfs/DistributedFileSystem.java | 173 +++++++++++++++---- .../hdfs/server/balancer/NameNodeConnector.java | 8 +- .../hadoop/hdfs/TestDistributedFileSystem.java | 84 +++++++-- .../hadoop/hdfs/TestErasureCodingPolicies.java | 25 ++- .../namenode/TestFavoredNodesEndToEnd.java | 2 +- 10 files changed, 390 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java index 55836cc..0527202 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java @@ -18,36 +18,70 @@ package org.apache.hadoop.fs; import com.google.common.base.Preconditions; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.Progressable; +import javax.annotation.Nonnull; import java.io.IOException; import java.util.EnumSet; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; -/** Base of specific file system FSDataOutputStreamBuilder. */ +/** + * Builder for {@link FSDataOutputStream} and its subclasses. + * + * It is used to create {@link FSDataOutputStream} when creating a new file or + * appending an existing file on {@link FileSystem}. + * + * By default, it does not create parent directory that do not exist. + * {@link FileSystem#createNonRecursive(Path, boolean, int, short, long, + * Progressable)}. + * + * To create missing parent directory, use {@link #recursive()}. + */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class FSDataOutputStreamBuilder { - private Path path = null; +public abstract class FSDataOutputStreamBuilder + <S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> { + private final FileSystem fs; + private final Path path; private FsPermission permission = null; - private Integer bufferSize; - private Short replication; - private Long blockSize; + private int bufferSize; + private short replication; + private long blockSize; + /** set to true to create missing directory. */ + private boolean recursive = false; + private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class); private Progressable progress = null; - private EnumSet<CreateFlag> flags = null; private ChecksumOpt checksumOpt = null; - private final FileSystem fs; - - protected FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) { + /** + * Return the concrete implementation of the builder instance. + */ + protected abstract B getThisBuilder(); + + /** + * Constructor. + */ + protected FSDataOutputStreamBuilder(@Nonnull FileSystem fileSystem, + @Nonnull Path p) { + Preconditions.checkNotNull(fileSystem); + Preconditions.checkNotNull(p); fs = fileSystem; path = p; + bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, + IO_FILE_BUFFER_SIZE_DEFAULT); + replication = fs.getDefaultReplication(path); + blockSize = fs.getDefaultBlockSize(p); + } + + protected FileSystem getFS() { + return fs; } protected Path getPath() { @@ -56,91 +90,136 @@ public class FSDataOutputStreamBuilder { protected FsPermission getPermission() { if (permission == null) { - return FsPermission.getFileDefault(); + permission = FsPermission.getFileDefault(); } return permission; } - public FSDataOutputStreamBuilder setPermission(final FsPermission perm) { + /** + * Set permission for the file. + */ + public B permission(@Nonnull final FsPermission perm) { Preconditions.checkNotNull(perm); permission = perm; - return this; + return getThisBuilder(); } protected int getBufferSize() { - if (bufferSize == null) { - return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, - IO_FILE_BUFFER_SIZE_DEFAULT); - } return bufferSize; } - public FSDataOutputStreamBuilder setBufferSize(int bufSize) { + /** + * Set the size of the buffer to be used. + */ + public B bufferSize(int bufSize) { bufferSize = bufSize; - return this; + return getThisBuilder(); } protected short getReplication() { - if (replication == null) { - return fs.getDefaultReplication(getPath()); - } return replication; } - public FSDataOutputStreamBuilder setReplication(short replica) { + /** + * Set replication factor. + */ + public B replication(short replica) { replication = replica; - return this; + return getThisBuilder(); } protected long getBlockSize() { - if (blockSize == null) { - return fs.getDefaultBlockSize(getPath()); - } return blockSize; } - public FSDataOutputStreamBuilder setBlockSize(long blkSize) { + /** + * Set block size. + */ + public B blockSize(long blkSize) { blockSize = blkSize; - return this; + return getThisBuilder(); + } + + /** + * Return true to create the parent directories if they do not exist. + */ + protected boolean isRecursive() { + return recursive; + } + + /** + * Create the parent directory if they do not exist. + */ + public B recursive() { + recursive = true; + return getThisBuilder(); } protected Progressable getProgress() { return progress; } - public FSDataOutputStreamBuilder setProgress(final Progressable prog) { + /** + * Set the facility of reporting progress. + */ + public B progress(@Nonnull final Progressable prog) { Preconditions.checkNotNull(prog); progress = prog; - return this; + return getThisBuilder(); } protected EnumSet<CreateFlag> getFlags() { - if (flags == null) { - return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); - } return flags; } - public FSDataOutputStreamBuilder setFlags( - final EnumSet<CreateFlag> enumFlags) { - Preconditions.checkNotNull(enumFlags); - flags = enumFlags; - return this; + /** + * Create an FSDataOutputStream at the specified path. + */ + public B create() { + flags.add(CreateFlag.CREATE); + return getThisBuilder(); + } + + /** + * Set to true to overwrite the existing file. + * Set it to false, an exception will be thrown when calling {@link #build()} + * if the file exists. + */ + public B overwrite(boolean overwrite) { + if (overwrite) { + flags.add(CreateFlag.OVERWRITE); + } else { + flags.remove(CreateFlag.OVERWRITE); + } + return getThisBuilder(); + } + + /** + * Append to an existing file (optional operation). + */ + public B append() { + flags.add(CreateFlag.APPEND); + return getThisBuilder(); } protected ChecksumOpt getChecksumOpt() { return checksumOpt; } - public FSDataOutputStreamBuilder setChecksumOpt( - final ChecksumOpt chksumOpt) { + /** + * Set checksum opt. + */ + public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) { Preconditions.checkNotNull(chksumOpt); checksumOpt = chksumOpt; - return this; + return getThisBuilder(); } - public FSDataOutputStream build() throws IOException { - return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(), - getReplication(), getBlockSize(), getProgress(), getChecksumOpt()); - } + /** + * Create the FSDataOutputStream to write on the file system. + * + * @throws HadoopIllegalArgumentException if the parameters are not valid. + * @throws IOException on errors when file system creates or appends the file. + */ + public abstract S build() throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 1907475..cc92f31 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4140,8 +4140,34 @@ public abstract class FileSystem extends Configured implements Closeable { return GlobalStorageStatistics.INSTANCE; } + private static final class FileSystemDataOutputStreamBuilder extends + FSDataOutputStreamBuilder<FSDataOutputStream, + FileSystemDataOutputStreamBuilder> { + + /** + * Constructor. + */ + protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) { + super(fileSystem, p); + } + + @Override + public FSDataOutputStream build() throws IOException { + return getFS().create(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), getProgress(), + getChecksumOpt()); + } + + @Override + protected FileSystemDataOutputStreamBuilder getThisBuilder() { + return this; + } + } + /** * Create a new FSDataOutputStreamBuilder for the file with path. + * Files are overwritten by default. + * * @param path file path * @return a FSDataOutputStreamBuilder object to build the file * @@ -4149,7 +4175,8 @@ public abstract class FileSystem extends Configured implements Closeable { * builder interface becomes stable. */ @InterfaceAudience.Private - protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { - return new FSDataOutputStreamBuilder(this, path); + protected FSDataOutputStreamBuilder createFile(Path path) { + return new FileSystemDataOutputStreamBuilder(this, path) + .create().overwrite(true); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 3466922..e940065 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -667,7 +667,7 @@ public class FilterFileSystem extends FileSystem { } @Override - protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { - return fs.newFSDataOutputStreamBuilder(path); + public FSDataOutputStreamBuilder createFile(Path path) { + return fs.createFile(path); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index 7e50ab1..c410e34 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -1270,7 +1270,7 @@ public class HarFileSystem extends FileSystem { } @Override - public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { - return fs.newFSDataOutputStreamBuilder(path); + public FSDataOutputStreamBuilder createFile(Path path) { + return fs.createFile(path); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java index 777e5c0..527b9eb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java @@ -659,9 +659,9 @@ public class TestLocalFileSystem { try { FSDataOutputStreamBuilder builder = - fileSys.newFSDataOutputStreamBuilder(path); + fileSys.createFile(path); FSDataOutputStream out = builder.build(); - String content = "Create with a generic type of createBuilder!"; + String content = "Create with a generic type of createFile!"; byte[] contentOrigin = content.getBytes("UTF8"); out.write(contentOrigin); out.close(); @@ -680,7 +680,7 @@ public class TestLocalFileSystem { // Test value not being set for replication, block size, buffer size // and permission FSDataOutputStreamBuilder builder = - fileSys.newFSDataOutputStreamBuilder(path); + fileSys.createFile(path); builder.build(); Assert.assertEquals("Should be default block size", builder.getBlockSize(), fileSys.getDefaultBlockSize()); @@ -694,8 +694,8 @@ public class TestLocalFileSystem { builder.getPermission(), FsPermission.getFileDefault()); // Test set 0 to replication, block size and buffer size - builder = fileSys.newFSDataOutputStreamBuilder(path); - builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0); + builder = fileSys.createFile(path); + builder.bufferSize(0).blockSize(0).replication((short) 0); Assert.assertEquals("Block size should be 0", builder.getBlockSize(), 0); Assert.assertEquals("Replication factor should be 0", http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 344f574..2f60e9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -101,7 +101,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import org.apache.commons.lang.StringUtils; import javax.annotation.Nonnull; @@ -526,6 +525,49 @@ public class DistributedFileSystem extends FileSystem { } /** + * Similar to {@link #create(Path, FsPermission, EnumSet, int, short, long, + * Progressable, ChecksumOpt, InetSocketAddress[], String)}, it provides a + * HDFS-specific version of {@link #createNonRecursive(Path, FsPermission, + * EnumSet, int, short, long, Progressable)} with a few additions. + * + * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable, + * ChecksumOpt, InetSocketAddress[], String) for the descriptions of + * additional parameters, i.e., favoredNodes and ecPolicyName. + */ + private HdfsDataOutputStream createNonRecursive(final Path f, + final FsPermission permission, final EnumSet<CreateFlag> flag, + final int bufferSize, final short replication, final long blockSize, + final Progressable progress, final ChecksumOpt checksumOpt, + final InetSocketAddress[] favoredNodes, final String ecPolicyName) + throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver<HdfsDataOutputStream>() { + @Override + public HdfsDataOutputStream doCall(final Path p) throws IOException { + final DFSOutputStream out = dfs.create(getPathName(f), permission, + flag, false, replication, blockSize, progress, bufferSize, + checksumOpt, favoredNodes, ecPolicyName); + return dfs.createWrappedOutputStream(out, statistics); + } + @Override + public HdfsDataOutputStream next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem myDfs = (DistributedFileSystem)fs; + return myDfs.createNonRecursive(p, permission, flag, bufferSize, + replication, blockSize, progress, checksumOpt, favoredNodes, + ecPolicyName); + } + throw new UnsupportedOperationException("Cannot create with" + + " favoredNodes through a symlink to a non-DistributedFileSystem: " + + f + " -> " + p); + } + }.resolve(this, absF); + } + + /** * Same as create(), except fails if parent directory doesn't already exist. */ @Override @@ -2686,33 +2728,88 @@ public class DistributedFileSystem extends FileSystem { } /** - * Extends FSDataOutputStreamBuilder to support special requirements - * of DistributedFileSystem. + * HdfsDataOutputStreamBuilder provides the HDFS-specific capabilities to + * write file on HDFS. */ - public static class HdfsDataOutputStreamBuilder - extends FSDataOutputStreamBuilder { + public static final class HdfsDataOutputStreamBuilder + extends FSDataOutputStreamBuilder< + HdfsDataOutputStream, HdfsDataOutputStreamBuilder> { private final DistributedFileSystem dfs; private InetSocketAddress[] favoredNodes = null; private String ecPolicyName = null; - private boolean shouldReplicate = false; - public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { + /** + * Construct a HdfsDataOutputStream builder for a file. + * @param dfs the {@link DistributedFileSystem} instance. + * @param path the path of the file to create / append. + */ + private HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { super(dfs, path); this.dfs = dfs; } - protected InetSocketAddress[] getFavoredNodes() { + @Override + protected HdfsDataOutputStreamBuilder getThisBuilder() { + return this; + } + + private InetSocketAddress[] getFavoredNodes() { return favoredNodes; } - public HdfsDataOutputStreamBuilder setFavoredNodes( + /** + * Set favored DataNodes. + * @param nodes the addresses of the favored DataNodes. + */ + public HdfsDataOutputStreamBuilder favoredNodes( @Nonnull final InetSocketAddress[] nodes) { Preconditions.checkNotNull(nodes); favoredNodes = nodes.clone(); return this; } - protected String getEcPolicyName() { + /** + * Force closed blocks to disk. + * + * @see CreateFlag for the details. + */ + public HdfsDataOutputStreamBuilder syncBlock() { + getFlags().add(CreateFlag.SYNC_BLOCK); + return this; + } + + /** + * Create the block on transient storage if possible. + * + * @see CreateFlag for the details. + */ + public HdfsDataOutputStreamBuilder lazyPersist() { + getFlags().add(CreateFlag.LAZY_PERSIST); + return this; + } + + /** + * Append data to a new block instead of the end of the last partial block. + * + * @see CreateFlag for the details. + */ + public HdfsDataOutputStreamBuilder newBlock() { + getFlags().add(CreateFlag.NEW_BLOCK); + return this; + } + + /** + * Advise that a block replica NOT be written to the local DataNode. + * + * @see CreateFlag for the details. + */ + public HdfsDataOutputStreamBuilder noLocalWrite() { + getFlags().add(CreateFlag.NO_LOCAL_WRITE); + return this; + } + + @VisibleForTesting + String getEcPolicyName() { return ecPolicyName; } @@ -2722,17 +2819,17 @@ public class DistributedFileSystem extends FileSystem { * or erasure coding policy is. Don't call this function and * enforceReplicate() in the same builder since they have conflict * of interest. - * */ - public HdfsDataOutputStreamBuilder setEcPolicyName( + public HdfsDataOutputStreamBuilder ecPolicyName( @Nonnull final String policyName) { Preconditions.checkNotNull(policyName); ecPolicyName = policyName; return this; } - public boolean shouldReplicate() { - return shouldReplicate; + @VisibleForTesting + boolean shouldReplicate() { + return getFlags().contains(CreateFlag.SHOULD_REPLICATE); } /** @@ -2742,30 +2839,46 @@ public class DistributedFileSystem extends FileSystem { * conflict of interest. */ public HdfsDataOutputStreamBuilder replicate() { - shouldReplicate = true; + getFlags().add(CreateFlag.SHOULD_REPLICATE); return this; } + @VisibleForTesting + @Override + protected EnumSet<CreateFlag> getFlags() { + return super.getFlags(); + } + + /** + * Build HdfsDataOutputStream to write. + * + * @return a fully-initialized OutputStream. + * @throws IOException on I/O errors. + */ @Override public HdfsDataOutputStream build() throws IOException { - Preconditions.checkState( - !(shouldReplicate() && (!StringUtils.isEmpty(getEcPolicyName()))), - "shouldReplicate and ecPolicyName are " + - "exclusive parameters. Set both is not allowed!"); - - EnumSet<CreateFlag> createFlags = getFlags(); - if (shouldReplicate()) { - createFlags.add(CreateFlag.SHOULD_REPLICATE); - } - return dfs.create(getPath(), getPermission(), createFlags, - getBufferSize(), getReplication(), getBlockSize(), - getProgress(), getChecksumOpt(), getFavoredNodes(), - getEcPolicyName()); + if (isRecursive()) { + return dfs.create(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), + getProgress(), getChecksumOpt(), getFavoredNodes(), + getEcPolicyName()); + } else { + return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), getProgress(), + getChecksumOpt(), getFavoredNodes(), getEcPolicyName()); + } } } + /** + * Create a HdfsDataOutputStreamBuilder to create a file on DFS. + * Similar to {@link #create(Path)}, file is overwritten by default. + * + * @param path the path of the file to create. + * @return A HdfsDataOutputStreamBuilder for creating a file. + */ @Override - public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { - return new HdfsDataOutputStreamBuilder(this, path); + public HdfsDataOutputStreamBuilder createFile(Path path) { + return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index e1b1005..be59cce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -25,7 +25,6 @@ import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -36,7 +35,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -246,10 +244,8 @@ public class NameNodeConnector implements Closeable { fs.delete(idPath, true); } - final FSDataOutputStream fsout = fs.newFSDataOutputStreamBuilder(idPath) - .replicate() - .setFlags(EnumSet.of(CreateFlag.CREATE)) - .build(); + final FSDataOutputStream fsout = fs.createFile(idPath) + .replicate().recursive().build(); Preconditions.checkState( fsout.hasCapability(StreamCapability.HFLUSH.getValue()) http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index e9af594..9857735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FileChecksum; @@ -71,6 +72,7 @@ import org.apache.hadoop.fs.StorageStatistics.LongStatistic; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; @@ -1411,36 +1413,88 @@ public class TestDistributedFileSystem { } } + private void testBuilderSetters(DistributedFileSystem fs) { + Path testFilePath = new Path("/testBuilderSetters"); + HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath); + + builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite() + .ecPolicyName("ec-policy"); + EnumSet<CreateFlag> flags = builder.getFlags(); + assertTrue(flags.contains(CreateFlag.APPEND)); + assertTrue(flags.contains(CreateFlag.CREATE)); + assertTrue(flags.contains(CreateFlag.NEW_BLOCK)); + assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE)); + assertFalse(flags.contains(CreateFlag.OVERWRITE)); + assertFalse(flags.contains(CreateFlag.SYNC_BLOCK)); + + assertEquals("ec-policy", builder.getEcPolicyName()); + assertFalse(builder.shouldReplicate()); + } + + @Test + public void testHdfsDataOutputStreamBuilderSetParameters() + throws IOException { + Configuration conf = getTestConfiguration(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + + testBuilderSetters(fs); + } + } + @Test public void testDFSDataOutputStreamBuilder() throws Exception { Configuration conf = getTestConfiguration(); - MiniDFSCluster cluster = null; String testFile = "/testDFSDataOutputStreamBuilder"; Path testFilePath = new Path(testFile); - try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { DistributedFileSystem fs = cluster.getFileSystem(); // Test create an empty file - FSDataOutputStream out = - fs.newFSDataOutputStreamBuilder(testFilePath).build(); - out.close(); + try (FSDataOutputStream out = + fs.createFile(testFilePath).build()) { + LOG.info("Test create an empty file"); + } // Test create a file with content, and verify the content String content = "This is a test!"; - out = fs.newFSDataOutputStreamBuilder(testFilePath) - .setBufferSize(4096).setReplication((short) 1) - .setBlockSize(4096).build(); - byte[] contentOrigin = content.getBytes("UTF8"); - out.write(contentOrigin); - out.close(); + try (FSDataOutputStream out1 = fs.createFile(testFilePath) + .bufferSize(4096) + .replication((short) 1) + .blockSize(4096) + .build()) { + byte[] contentOrigin = content.getBytes("UTF8"); + out1.write(contentOrigin); + } ContractTestUtils.verifyFileContents(fs, testFilePath, content.getBytes()); - } finally { - if (cluster != null) { - cluster.shutdown(); + + try (FSDataOutputStream out = fs.createFile(testFilePath).overwrite(false) + .build()) { + fail("it should fail to overwrite an existing file"); + } catch (FileAlreadyExistsException e) { + // As expected, ignore. + } + + Path nonParentFile = new Path("/parent/test"); + try (FSDataOutputStream out = fs.createFile(nonParentFile).build()) { + fail("parent directory not exist"); + } catch (FileNotFoundException e) { + // As expected. + } + assertFalse("parent directory should not be created", + fs.exists(new Path("/parent"))); + + try (FSDataOutputStream out = fs.createFile(nonParentFile).recursive() + .build()) { + out.write(1); } + assertTrue("parent directory has not been created", + fs.exists(new Path("/parent"))); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java index 77e6594..4a4bed5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java @@ -540,15 +540,14 @@ public class TestErasureCodingPolicies { fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName()); // null EC policy name value means inheriting parent directory's policy - fs.newFSDataOutputStreamBuilder(filePath0).build().close(); + fs.createFile(filePath0).build().close(); ErasureCodingPolicy ecPolicyOnFile = fs.getErasureCodingPolicy(filePath0); assertEquals(EC_POLICY, ecPolicyOnFile); // Test illegal EC policy name final String illegalPolicyName = "RS-DEFAULT-1-2-64k"; try { - fs.newFSDataOutputStreamBuilder(filePath1) - .setEcPolicyName(illegalPolicyName).build().close(); + fs.createFile(filePath1).ecPolicyName(illegalPolicyName).build().close(); Assert.fail("illegal erasure coding policy should not be found"); } catch (Exception e) { GenericTestUtils.assertExceptionContains("Policy '" + illegalPolicyName @@ -563,8 +562,8 @@ public class TestErasureCodingPolicies { SystemErasureCodingPolicies.RS_3_2_POLICY_ID); ecPolicyOnFile = EC_POLICY; fs.setErasureCodingPolicy(dirPath, ecPolicyOnDir.getName()); - fs.newFSDataOutputStreamBuilder(filePath0) - .setEcPolicyName(ecPolicyOnFile.getName()).build().close(); + fs.createFile(filePath0).ecPolicyName(ecPolicyOnFile.getName()) + .build().close(); assertEquals(ecPolicyOnFile, fs.getErasureCodingPolicy(filePath0)); assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath)); fs.delete(dirPath, true); @@ -582,27 +581,27 @@ public class TestErasureCodingPolicies { fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName()); final String ecPolicyName = "RS-10-4-64k"; - fs.newFSDataOutputStreamBuilder(filePath).build().close(); + fs.createFile(filePath).build().close(); assertEquals(EC_POLICY, fs.getErasureCodingPolicy(filePath)); fs.delete(filePath, true); - fs.newFSDataOutputStreamBuilder(filePath) - .setEcPolicyName(ecPolicyName) + fs.createFile(filePath) + .ecPolicyName(ecPolicyName) .build() .close(); assertEquals(ecPolicyName, fs.getErasureCodingPolicy(filePath).getName()); fs.delete(filePath, true); try { - fs.newFSDataOutputStreamBuilder(filePath) - .setEcPolicyName(ecPolicyName) + fs.createFile(filePath) + .ecPolicyName(ecPolicyName) .replicate() .build().close(); Assert.fail("shouldReplicate and ecPolicyName are exclusive " + "parameters. Set both is not allowed."); }catch (Exception e){ - GenericTestUtils.assertExceptionContains("shouldReplicate and " + - "ecPolicyName are exclusive parameters. Set both is not allowed!", e); + GenericTestUtils.assertExceptionContains("SHOULD_REPLICATE flag and " + + "ecPolicyName are exclusive parameters.", e); } try { @@ -618,7 +617,7 @@ public class TestErasureCodingPolicies { "ecPolicyName are exclusive parameters. Set both is not allowed!", e); } - fs.newFSDataOutputStreamBuilder(filePath) + fs.createFile(filePath) .replicate() .build() .close(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e7cfdca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java index 50e56cc..3352fd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -199,7 +199,7 @@ public class TestFavoredNodesEndToEnd { InetSocketAddress[] dns = getDatanodes(rand); Path p = new Path("/filename"+i); FSDataOutputStream out = - dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build(); + dfs.createFile(p).favoredNodes(dns).build(); out.write(SOME_BYTES); out.close(); BlockLocation[] locations = getBlockLocations(p); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
