Repository: hadoop
Updated Branches:
refs/heads/branch-2 7d81b0bea -> abac844c9
HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)
(cherry picked from commit 5fbec46525d6d49837d934556b59ba77bd2301a8)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/abac844c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/abac844c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/abac844c
Branch: refs/heads/branch-2
Commit: abac844c90b5c9bff9f239032b96fd8e1e04dc15
Parents: 7d81b0b
Author: Lei Xu <[email protected]>
Authored: Wed Jun 14 23:17:53 2017 -0700
Committer: Lei Xu <[email protected]>
Committed: Thu Jun 15 10:59:11 2017 -0700
----------------------------------------------------------------------
.../hadoop/fs/FSDataOutputStreamBuilder.java | 171 ++++++++++++++-----
.../java/org/apache/hadoop/fs/FileSystem.java | 31 +++-
.../org/apache/hadoop/fs/FilterFileSystem.java | 4 +-
.../org/apache/hadoop/fs/HarFileSystem.java | 4 +-
.../apache/hadoop/fs/TestLocalFileSystem.java | 10 +-
.../hadoop/hdfs/DistributedFileSystem.java | 150 ++++++++++++++--
.../hdfs/server/balancer/NameNodeConnector.java | 5 +-
.../hadoop/hdfs/TestDistributedFileSystem.java | 80 +++++++--
.../namenode/TestFavoredNodesEndToEnd.java | 2 +-
9 files changed, 370 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
index 55836cc..0527202 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
@@ -18,36 +18,70 @@
package org.apache.hadoop.fs;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
+import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.EnumSet;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
-/** Base of specific file system FSDataOutputStreamBuilder. */
+/**
+ * Builder for {@link FSDataOutputStream} and its subclasses.
+ *
+ * It is used to create {@link FSDataOutputStream} when creating a new file or
+ * appending an existing file on {@link FileSystem}.
+ *
+ * By default, it does not create parent directory that do not exist.
+ * {@link FileSystem#createNonRecursive(Path, boolean, int, short, long,
+ * Progressable)}.
+ *
+ * To create missing parent directory, use {@link #recursive()}.
+ */
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class FSDataOutputStreamBuilder {
- private Path path = null;
+public abstract class FSDataOutputStreamBuilder
+ <S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> {
+ private final FileSystem fs;
+ private final Path path;
private FsPermission permission = null;
- private Integer bufferSize;
- private Short replication;
- private Long blockSize;
+ private int bufferSize;
+ private short replication;
+ private long blockSize;
+ /** set to true to create missing directory. */
+ private boolean recursive = false;
+ private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
private Progressable progress = null;
- private EnumSet<CreateFlag> flags = null;
private ChecksumOpt checksumOpt = null;
- private final FileSystem fs;
-
- protected FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
+ /**
+ * Return the concrete implementation of the builder instance.
+ */
+ protected abstract B getThisBuilder();
+
+ /**
+ * Constructor.
+ */
+ protected FSDataOutputStreamBuilder(@Nonnull FileSystem fileSystem,
+ @Nonnull Path p) {
+ Preconditions.checkNotNull(fileSystem);
+ Preconditions.checkNotNull(p);
fs = fileSystem;
path = p;
+ bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
+ IO_FILE_BUFFER_SIZE_DEFAULT);
+ replication = fs.getDefaultReplication(path);
+ blockSize = fs.getDefaultBlockSize(p);
+ }
+
+ protected FileSystem getFS() {
+ return fs;
}
protected Path getPath() {
@@ -56,91 +90,136 @@ public class FSDataOutputStreamBuilder {
protected FsPermission getPermission() {
if (permission == null) {
- return FsPermission.getFileDefault();
+ permission = FsPermission.getFileDefault();
}
return permission;
}
- public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
+ /**
+ * Set permission for the file.
+ */
+ public B permission(@Nonnull final FsPermission perm) {
Preconditions.checkNotNull(perm);
permission = perm;
- return this;
+ return getThisBuilder();
}
protected int getBufferSize() {
- if (bufferSize == null) {
- return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
- IO_FILE_BUFFER_SIZE_DEFAULT);
- }
return bufferSize;
}
- public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
+ /**
+ * Set the size of the buffer to be used.
+ */
+ public B bufferSize(int bufSize) {
bufferSize = bufSize;
- return this;
+ return getThisBuilder();
}
protected short getReplication() {
- if (replication == null) {
- return fs.getDefaultReplication(getPath());
- }
return replication;
}
- public FSDataOutputStreamBuilder setReplication(short replica) {
+ /**
+ * Set replication factor.
+ */
+ public B replication(short replica) {
replication = replica;
- return this;
+ return getThisBuilder();
}
protected long getBlockSize() {
- if (blockSize == null) {
- return fs.getDefaultBlockSize(getPath());
- }
return blockSize;
}
- public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
+ /**
+ * Set block size.
+ */
+ public B blockSize(long blkSize) {
blockSize = blkSize;
- return this;
+ return getThisBuilder();
+ }
+
+ /**
+ * Return true to create the parent directories if they do not exist.
+ */
+ protected boolean isRecursive() {
+ return recursive;
+ }
+
+ /**
+ * Create the parent directory if they do not exist.
+ */
+ public B recursive() {
+ recursive = true;
+ return getThisBuilder();
}
protected Progressable getProgress() {
return progress;
}
- public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
+ /**
+ * Set the facility of reporting progress.
+ */
+ public B progress(@Nonnull final Progressable prog) {
Preconditions.checkNotNull(prog);
progress = prog;
- return this;
+ return getThisBuilder();
}
protected EnumSet<CreateFlag> getFlags() {
- if (flags == null) {
- return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
- }
return flags;
}
- public FSDataOutputStreamBuilder setFlags(
- final EnumSet<CreateFlag> enumFlags) {
- Preconditions.checkNotNull(enumFlags);
- flags = enumFlags;
- return this;
+ /**
+ * Create an FSDataOutputStream at the specified path.
+ */
+ public B create() {
+ flags.add(CreateFlag.CREATE);
+ return getThisBuilder();
+ }
+
+ /**
+ * Set to true to overwrite the existing file.
+ * Set it to false, an exception will be thrown when calling {@link #build()}
+ * if the file exists.
+ */
+ public B overwrite(boolean overwrite) {
+ if (overwrite) {
+ flags.add(CreateFlag.OVERWRITE);
+ } else {
+ flags.remove(CreateFlag.OVERWRITE);
+ }
+ return getThisBuilder();
+ }
+
+ /**
+ * Append to an existing file (optional operation).
+ */
+ public B append() {
+ flags.add(CreateFlag.APPEND);
+ return getThisBuilder();
}
protected ChecksumOpt getChecksumOpt() {
return checksumOpt;
}
- public FSDataOutputStreamBuilder setChecksumOpt(
- final ChecksumOpt chksumOpt) {
+ /**
+ * Set checksum opt.
+ */
+ public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
Preconditions.checkNotNull(chksumOpt);
checksumOpt = chksumOpt;
- return this;
+ return getThisBuilder();
}
- public FSDataOutputStream build() throws IOException {
- return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
- getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
- }
+ /**
+ * Create the FSDataOutputStream to write on the file system.
+ *
+ * @throws HadoopIllegalArgumentException if the parameters are not valid.
+ * @throws IOException on errors when file system creates or appends the
file.
+ */
+ public abstract S build() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 89b6ba1..e1329f4 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -4124,8 +4124,34 @@ public abstract class FileSystem extends Configured
implements Closeable {
return GlobalStorageStatistics.INSTANCE;
}
+ private static final class FileSystemDataOutputStreamBuilder extends
+ FSDataOutputStreamBuilder<FSDataOutputStream,
+ FileSystemDataOutputStreamBuilder> {
+
+ /**
+ * Constructor.
+ */
+ protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p)
{
+ super(fileSystem, p);
+ }
+
+ @Override
+ public FSDataOutputStream build() throws IOException {
+ return getFS().create(getPath(), getPermission(), getFlags(),
+ getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+ getChecksumOpt());
+ }
+
+ @Override
+ protected FileSystemDataOutputStreamBuilder getThisBuilder() {
+ return this;
+ }
+ }
+
/**
* Create a new FSDataOutputStreamBuilder for the file with path.
+ * Files are overwritten by default.
+ *
* @param path file path
* @return a FSDataOutputStreamBuilder object to build the file
*
@@ -4133,7 +4159,8 @@ public abstract class FileSystem extends Configured
implements Closeable {
* builder interface becomes stable.
*/
@InterfaceAudience.Private
- protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return new FSDataOutputStreamBuilder(this, path);
+ protected FSDataOutputStreamBuilder createFile(Path path) {
+ return new FileSystemDataOutputStreamBuilder(this, path)
+ .create().overwrite(true);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
index 3466922..e940065 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java
@@ -667,7 +667,7 @@ public class FilterFileSystem extends FileSystem {
}
@Override
- protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return fs.newFSDataOutputStreamBuilder(path);
+ public FSDataOutputStreamBuilder createFile(Path path) {
+ return fs.createFile(path);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
index 6f57c7b..ec3a457 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java
@@ -1270,7 +1270,7 @@ public class HarFileSystem extends FileSystem {
}
@Override
- public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return fs.newFSDataOutputStreamBuilder(path);
+ public FSDataOutputStreamBuilder createFile(Path path) {
+ return fs.createFile(path);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
index a9f5a6e..f9b8cfa 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java
@@ -654,9 +654,9 @@ public class TestLocalFileSystem {
try {
FSDataOutputStreamBuilder builder =
- fileSys.newFSDataOutputStreamBuilder(path);
+ fileSys.createFile(path);
FSDataOutputStream out = builder.build();
- String content = "Create with a generic type of createBuilder!";
+ String content = "Create with a generic type of createFile!";
byte[] contentOrigin = content.getBytes("UTF8");
out.write(contentOrigin);
out.close();
@@ -675,7 +675,7 @@ public class TestLocalFileSystem {
// Test value not being set for replication, block size, buffer size
// and permission
FSDataOutputStreamBuilder builder =
- fileSys.newFSDataOutputStreamBuilder(path);
+ fileSys.createFile(path);
builder.build();
Assert.assertEquals("Should be default block size",
builder.getBlockSize(), fileSys.getDefaultBlockSize());
@@ -689,8 +689,8 @@ public class TestLocalFileSystem {
builder.getPermission(), FsPermission.getFileDefault());
// Test set 0 to replication, block size and buffer size
- builder = fileSys.newFSDataOutputStreamBuilder(path);
- builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
+ builder = fileSys.createFile(path);
+ builder.bufferSize(0).blockSize(0).replication((short) 0);
Assert.assertEquals("Block size should be 0",
builder.getBlockSize(), 0);
Assert.assertEquals("Replication factor should be 0",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index a09c1b5..3e7c899 100644
---
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -101,6 +101,8 @@ import
org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import javax.annotation.Nonnull;
+
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
* This object is the way end-user code interacts with a Hadoop
@@ -541,6 +543,48 @@ public class DistributedFileSystem extends FileSystem {
}
/**
+ * Similar to {@link #create(Path, FsPermission, EnumSet, int, short, long,
+ * Progressable, ChecksumOpt, InetSocketAddress[])}, it provides a
+ * HDFS-specific version of {@link #createNonRecursive(Path, FsPermission,
+ * EnumSet, int, short, long, Progressable)} with a few additions.
+ *
+ * @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable,
+ * ChecksumOpt, InetSocketAddress[]) for the descriptions of
+ * additional parameters, i.e., favoredNodes and ecPolicyName.
+ */
+ private HdfsDataOutputStream createNonRecursive(final Path f,
+ final FsPermission permission, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress, final ChecksumOpt checksumOpt,
+ final InetSocketAddress[] favoredNodes)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ storageStatistics.incrementOpCounter(OpType.CREATE);
+ Path absF = fixRelativePart(f);
+ return new FileSystemLinkResolver<HdfsDataOutputStream>() {
+ @Override
+ public HdfsDataOutputStream doCall(final Path p) throws IOException {
+ final DFSOutputStream out = dfs.create(getPathName(f), permission,
+ flag, false, replication, blockSize, progress, bufferSize,
+ checksumOpt, favoredNodes);
+ return dfs.createWrappedOutputStream(out, statistics);
+ }
+ @Override
+ public HdfsDataOutputStream next(final FileSystem fs, final Path p)
+ throws IOException {
+ if (fs instanceof DistributedFileSystem) {
+ DistributedFileSystem myDfs = (DistributedFileSystem)fs;
+ return myDfs.createNonRecursive(p, permission, flag, bufferSize,
+ replication, blockSize, progress, checksumOpt, favoredNodes);
+ }
+ throw new UnsupportedOperationException("Cannot create with" +
+ " favoredNodes through a symlink to a non-DistributedFileSystem: "
+ + f + " -> " + p);
+ }
+ }.resolve(this, absF);
+ }
+
+ /**
* Same as create(), except fails if parent directory doesn't already exist.
*/
@Override
@@ -2603,40 +2647,120 @@ public class DistributedFileSystem extends FileSystem {
}
/**
- * Extends FSDataOutputStreamBuilder to support special requirements
- * of DistributedFileSystem.
+ * HdfsDataOutputStreamBuilder provides the HDFS-specific capabilities to
+ * write file on HDFS.
*/
- public static class HdfsDataOutputStreamBuilder
- extends FSDataOutputStreamBuilder {
+ public static final class HdfsDataOutputStreamBuilder
+ extends FSDataOutputStreamBuilder<
+ HdfsDataOutputStream, HdfsDataOutputStreamBuilder> {
private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null;
- public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
+ /**
+ * Construct a HdfsDataOutputStream builder for a file.
+ * @param dfs the {@link DistributedFileSystem} instance.
+ * @param path the path of the file to create / append.
+ */
+ private HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
super(dfs, path);
this.dfs = dfs;
}
- protected InetSocketAddress[] getFavoredNodes() {
+ @Override
+ protected HdfsDataOutputStreamBuilder getThisBuilder() {
+ return this;
+ }
+
+ private InetSocketAddress[] getFavoredNodes() {
return favoredNodes;
}
- public HdfsDataOutputStreamBuilder setFavoredNodes(
- final InetSocketAddress[] nodes) {
+ /**
+ * Set favored DataNodes.
+ * @param nodes the addresses of the favored DataNodes.
+ */
+ public HdfsDataOutputStreamBuilder favoredNodes(
+ @Nonnull final InetSocketAddress[] nodes) {
Preconditions.checkNotNull(nodes);
favoredNodes = nodes.clone();
return this;
}
+ /**
+ * Force closed blocks to disk.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder syncBlock() {
+ getFlags().add(CreateFlag.SYNC_BLOCK);
+ return this;
+ }
+
+ /**
+ * Create the block on transient storage if possible.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder lazyPersist() {
+ getFlags().add(CreateFlag.LAZY_PERSIST);
+ return this;
+ }
+
+ /**
+ * Append data to a new block instead of the end of the last partial block.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder newBlock() {
+ getFlags().add(CreateFlag.NEW_BLOCK);
+ return this;
+ }
+
+ /**
+ * Advise that a block replica NOT be written to the local DataNode.
+ *
+ * @see CreateFlag for the details.
+ */
+ public HdfsDataOutputStreamBuilder noLocalWrite() {
+ getFlags().add(CreateFlag.NO_LOCAL_WRITE);
+ return this;
+ }
+
+ @VisibleForTesting
+ @Override
+ protected EnumSet<CreateFlag> getFlags() {
+ return super.getFlags();
+ }
+
+ /**
+ * Build HdfsDataOutputStream to write.
+ *
+ * @return a fully-initialized OutputStream.
+ * @throws IOException on I/O errors.
+ */
@Override
public HdfsDataOutputStream build() throws IOException {
- return dfs.create(getPath(), getPermission(), getFlags(),
- getBufferSize(), getReplication(), getBlockSize(),
- getProgress(), getChecksumOpt(), getFavoredNodes());
+ if (isRecursive()) {
+ return dfs.create(getPath(), getPermission(), getFlags(),
+ getBufferSize(), getReplication(), getBlockSize(),
+ getProgress(), getChecksumOpt(), getFavoredNodes());
+ } else {
+ return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
+ getBufferSize(), getReplication(), getBlockSize(), getProgress(),
+ getChecksumOpt(), getFavoredNodes());
+ }
}
}
+ /**
+ * Create a HdfsDataOutputStreamBuilder to create a file on DFS.
+ * Similar to {@link #create(Path)}, file is overwritten by default.
+ *
+ * @param path the path of the file to create.
+ * @return A HdfsDataOutputStreamBuilder for creating a file.
+ */
@Override
- public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
- return new HdfsDataOutputStreamBuilder(this, path);
+ public HdfsDataOutputStreamBuilder createFile(Path path) {
+ return new HdfsDataOutputStreamBuilder(this,
path).create().overwrite(true);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 0041841..30d014d 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -242,7 +242,10 @@ public class NameNodeConnector implements Closeable {
IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true);
}
- final FSDataOutputStream fsout = fs.create(idPath, false);
+
+ final FSDataOutputStream fsout = fs.createFile(idPath)
+ .overwrite(false).recursive().build();
+
// mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath);
if (write2IdFile) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 24aa1be..fe2dbb9 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FileChecksum;
@@ -78,6 +79,7 @@ import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import
org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@@ -1542,36 +1544,84 @@ public class TestDistributedFileSystem {
}
}
+ private void testBuilderSetters(DistributedFileSystem fs) {
+ Path testFilePath = new Path("/testBuilderSetters");
+ HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath);
+
+ builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite();
+ EnumSet<CreateFlag> flags = builder.getFlags();
+ assertTrue(flags.contains(CreateFlag.APPEND));
+ assertTrue(flags.contains(CreateFlag.CREATE));
+ assertTrue(flags.contains(CreateFlag.NEW_BLOCK));
+ assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE));
+ assertFalse(flags.contains(CreateFlag.OVERWRITE));
+ assertFalse(flags.contains(CreateFlag.SYNC_BLOCK));
+ }
+
+ @Test
+ public void testHdfsDataOutputStreamBuilderSetParameters()
+ throws IOException {
+ Configuration conf = getTestConfiguration();
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build()) {
+ cluster.waitActive();
+ DistributedFileSystem fs = cluster.getFileSystem();
+
+ testBuilderSetters(fs);
+ }
+ }
+
@Test
public void testDFSDataOutputStreamBuilder() throws Exception {
Configuration conf = getTestConfiguration();
- MiniDFSCluster cluster = null;
String testFile = "/testDFSDataOutputStreamBuilder";
Path testFilePath = new Path(testFile);
- try {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build()) {
DistributedFileSystem fs = cluster.getFileSystem();
// Test create an empty file
- FSDataOutputStream out =
- fs.newFSDataOutputStreamBuilder(testFilePath).build();
- out.close();
+ try (FSDataOutputStream out =
+ fs.createFile(testFilePath).build()) {
+ LOG.info("Test create an empty file");
+ }
// Test create a file with content, and verify the content
String content = "This is a test!";
- out = fs.newFSDataOutputStreamBuilder(testFilePath)
- .setBufferSize(4096).setReplication((short) 1)
- .setBlockSize(4096).build();
- byte[] contentOrigin = content.getBytes("UTF8");
- out.write(contentOrigin);
- out.close();
+ try (FSDataOutputStream out1 = fs.createFile(testFilePath)
+ .bufferSize(4096)
+ .replication((short) 1)
+ .blockSize(4096)
+ .build()) {
+ byte[] contentOrigin = content.getBytes("UTF8");
+ out1.write(contentOrigin);
+ }
ContractTestUtils.verifyFileContents(fs, testFilePath,
content.getBytes());
- } finally {
- if (cluster != null) {
- cluster.shutdown();
+
+ try (FSDataOutputStream out =
fs.createFile(testFilePath).overwrite(false)
+ .build()) {
+ fail("it should fail to overwrite an existing file");
+ } catch (FileAlreadyExistsException e) {
+ // As expected, ignore.
+ }
+
+ Path nonParentFile = new Path("/parent/test");
+ try (FSDataOutputStream out = fs.createFile(nonParentFile).build()) {
+ fail("parent directory not exist");
+ } catch (FileNotFoundException e) {
+ // As expected.
+ }
+ assertFalse("parent directory should not be created",
+ fs.exists(new Path("/parent")));
+
+ try (FSDataOutputStream out = fs.createFile(nonParentFile).recursive()
+ .build()) {
+ out.write(1);
}
+ assertTrue("parent directory has not been created",
+ fs.exists(new Path("/parent")));
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/abac844c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
----------------------------------------------------------------------
diff --git
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
index 50e56cc..3352fd0 100644
---
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
+++
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java
@@ -199,7 +199,7 @@ public class TestFavoredNodesEndToEnd {
InetSocketAddress[] dns = getDatanodes(rand);
Path p = new Path("/filename"+i);
FSDataOutputStream out =
- dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
+ dfs.createFile(p).favoredNodes(dns).build();
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]