[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16620433#comment-16620433
 ] 

ASF GitHub Bot commented on FLINK-9061:
---------------------------------------

StephanEwen closed pull request #6604: [FLINK-9061] Optionally add entropy to 
checkpoint paths better S3 scalability
URL: https://github.com/apache/flink/pull/6604
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d451109ddba..ba2113ae616 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -576,29 +576,8 @@ public boolean exists(final Path f) throws IOException {
        /**
         * Opens an FSDataOutputStream at the indicated Path.
         *
-        * <p>This method is deprecated, because most of its parameters are 
ignored by most file systems.
-        * To control for example the replication factor and block size in the 
Hadoop Distributed File system,
-        * make sure that the respective Hadoop configuration file is either 
linked from the Flink configuration,
-        * or in the classpath of either Flink or the user code.
-        *
-        * @param f
-        *        the file name to open
-        * @param overwrite
-        *        if a file with this name already exists, then if true,
-        *        the file will be overwritten, and if false an error will be 
thrown.
-        * @param bufferSize
-        *        the size of the buffer to be used.
-        * @param replication
-        *        required block replication for the file.
-        * @param blockSize
-        *        the size of the file blocks
-        *
-        * @throws IOException Thrown, if the stream could not be opened 
because of an I/O, or because
-        *                     a file already exists at that path and the write 
mode indicates to not
-        *                     overwrite the file.
-        *
-        * @deprecated Deprecated because not well supported across types of 
file systems.
-        *             Control the behavior of specific file systems via 
configurations instead.
+        * @deprecated Deprecated in favor of {@link #create(Path, 
WriteOptions)} which offers better extensibility
+        *             to options that are supported only by some filesystems 
implementations.
         */
        @Deprecated
        public FSDataOutputStream create(
@@ -648,6 +627,25 @@ public FSDataOutputStream create(Path f, boolean 
overwrite) throws IOException {
         */
        public abstract FSDataOutputStream create(Path f, WriteMode 
overwriteMode) throws IOException;
 
+       /**
+        * Creates a new file at the given path and opens an FSDataOutputStream 
to that new file.
+        *
+        * <p>This method takes various options, some of which are not 
supported by all file systems
+        * (such as controlling block size).
+        *
+        * <p>Implementation note: This method should be abstract, but is 
currently not in order to not break
+        * backwards compatibility of this class with earlier Flink versions.
+        *
+        * @param f The path for the new file.
+        * @param options The options to parametrize the file and stream 
creation.
+        * @return The stream to the new file at the target path.
+        *
+        * @throws IOException Thrown if an error occurs while creating the 
file or opening the stream.
+        */
+       public FSDataOutputStream create(Path f, WriteOptions options) throws 
IOException {
+               return create(f, options.getOverwrite());
+       }
+
        /**
         * Renames the file/directory src to dst.
         *
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
new file mode 100644
index 00000000000..70f4973dc14
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/WriteOptions.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Write options that can be passed to the methods that write files.
+ */
+@Public
+public class WriteOptions {
+
+       private WriteMode overwrite = WriteMode.NO_OVERWRITE;
+
+       @Nullable
+       private BlockOptions blockSettings;
+
+       private boolean injectEntropy;
+
+       // 
------------------------------------------------------------------------
+       //  getters & setters
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Gets the overwrite option.
+        */
+       public WriteMode getOverwrite() {
+               return overwrite;
+       }
+
+       /**
+        * Sets the overwrite option.
+        *
+        * <p>Method returns this object for fluent function call chaining.
+        */
+       public WriteOptions setOverwrite(WriteMode overwrite) {
+               this.overwrite = checkNotNull(overwrite);
+               return this;
+       }
+
+       /**
+        * Gets the block writing settings, like size and replication factor.
+        * Returns null if no settings are defined.
+        */
+       @Nullable
+       public BlockOptions getBlockSettings() {
+               return blockSettings;
+       }
+
+       /**
+        * Sets the block settings, for file systems working with block 
replication and
+        * exposing those settings
+        *
+        * <p>Method returns this object for fluent function call chaining.
+        */
+       public WriteOptions setBlockSettings(@Nullable BlockOptions 
blockSettings) {
+               this.blockSettings = blockSettings;
+               return this;
+       }
+
+       /**
+        * Gets whether to inject entropy into the path.
+        */
+       public boolean isInjectEntropy() {
+               return injectEntropy;
+       }
+
+       /**
+        * Sets whether to inject entropy into the path.
+        *
+        * <p>Entropy injection is only supported select filesystems like S3 to 
overcome
+        * scalability issues in the sharding. For this option to have any 
effect, the
+        * file system must be configured to replace an entropy key with 
entropy, and the
+        * path that is written to must contain the entropy key.
+        *
+        * <p>Method returns this object for fluent function call chaining.
+        */
+       public WriteOptions setInjectEntropy(boolean injectEntropy) {
+               this.injectEntropy = injectEntropy;
+               return this;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  nested options classes
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Settings for block replication. Interpreted only by filesystems that 
are based
+        * expose block replication settings.
+        */
+       @Public
+       public static class BlockOptions {
+
+               /** The size of the blocks, in bytes. */
+               private long blockSize;
+
+               /** The number of times the block should be replicated. */
+               private int replicationFactor;
+
+               public BlockOptions(long blockSize, int replicationFactor) {
+                       checkArgument(blockSize > 0, "blockSize must be >0");
+                       checkArgument(replicationFactor > 0, "replicationFactor 
must be >=1");
+
+                       this.blockSize = blockSize;
+                       this.replicationFactor = replicationFactor;
+               }
+
+               /**
+                * Gets the block size, in bytes.
+                */
+               public long getBlockSize() {
+                       return blockSize;
+               }
+
+               /**
+                * Gets the number of times the block should be replicated.
+                */
+               public int getReplicationFactor() {
+                       return replicationFactor;
+               }
+       }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index b9ff319f7b3..aeb49cc0076 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -32,6 +32,7 @@
 import java.util.Random;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -248,6 +249,37 @@ public static String getRandomString(Random rnd, int 
minLength, int maxLength, c
                return new String(data);
        }
 
+       /**
+        * Appends a random alphanumeric string of given length to the given 
string buffer.
+        *
+        * @param rnd The random number generator to use.
+        * @param buffer The buffer to append to.
+        * @param length The number of alphanumeric characters to append.
+        */
+       public static void appendRandomAlphanumericString(Random rnd, 
StringBuilder buffer, int length) {
+               checkNotNull(rnd);
+               checkArgument(length >= 0);
+
+               for (int i = 0; i < length; i++) {
+                       buffer.append(nextAlphanumericChar(rnd));
+               }
+       }
+
+       private static char nextAlphanumericChar(Random rnd) {
+               int which = rnd.nextInt(62);
+               char c;
+               if (which < 10) {
+                       c = (char) ('0' + which);
+               }
+               else if (which < 36) {
+                       c = (char) ('A' - 10 + which);
+               }
+               else {
+                       c = (char) ('a' - 36 + which);
+               }
+               return c;
+       }
+
        /**
         * Writes a String to the given output.
         * The written string can be read with {@link 
#readString(DataInputView)}.
diff --git 
a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
index 5f705b4c834..1c9abf28eb8 100644
--- a/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/StringUtilsTest.java
@@ -20,8 +20,11 @@
 
 import org.junit.Test;
 
+import java.util.Random;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the {@link StringUtils}.
@@ -56,4 +59,15 @@ public void testHexArrayToString() {
                String hex = StringUtils.byteToHexString(byteArray);
                assertEquals("019f314a", hex);
        }
+
+       @Test
+       public void testAppendAlphanumeric() {
+               StringBuilder bld = new StringBuilder();
+               StringUtils.appendRandomAlphanumericString(new Random(), bld, 
256);
+               String str = bld.toString();
+
+               if (!str.matches("[a-zA-Z0-9]+")) {
+                       fail("Not alphanumeric: " + str);
+               }
+       }
 }
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 065ba5a66f8..bceed5e95a8 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.fs.hdfs;
 
 import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.core.fs.WriteOptions.BlockOptions;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.Locale;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -36,6 +40,9 @@
  */
 public class HadoopFileSystem extends FileSystem {
 
+       /** The write buffer size used by default. */
+       public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+
        /** The wrapped Hadoop File System. */
        private final org.apache.hadoop.fs.FileSystem fs;
 
@@ -142,6 +149,24 @@ public HadoopDataOutputStream create(final Path f, final 
WriteMode overwrite) th
                return new HadoopDataOutputStream(fsDataOutputStream);
        }
 
+       @Override
+       public FSDataOutputStream create(Path f, WriteOptions options) throws 
IOException {
+               BlockOptions blockSettings = options.getBlockSettings();
+               if (blockSettings == null) {
+                       return create(f, options.getOverwrite());
+               }
+               else {
+                       checkArgument(blockSettings.getReplicationFactor() <= 
Short.MAX_VALUE,
+                                       "block replication factor out of 
bounds");
+
+                       return create(f,
+                                       options.getOverwrite() == 
WriteMode.OVERWRITE,
+                                       DEFAULT_WRITE_BUFFER_SIZE,
+                                       (short) 
blockSettings.getReplicationFactor(),
+                                       blockSettings.getBlockSize());
+               }
+       }
+
        @Override
        public boolean delete(final Path f, final boolean recursive) throws 
IOException {
                return this.fs.delete(toHadoopPath(f), recursive);
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index a04f9c94a62..230d18bdcdb 100644
--- 
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -19,12 +19,20 @@
 package org.apache.flink.fs.s3presto;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.fs.hdfs.AbstractFileSystemFactory;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
@@ -34,7 +42,31 @@
 /**
  * Simple factory for the S3 file system.
  */
-public class S3FileSystemFactory extends AbstractFileSystemFactory {
+public class S3FileSystemFactory implements FileSystemFactory {
+
+       /**
+        * The substring to be replaced by random entropy in checkpoint paths.
+        */
+       public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = 
ConfigOptions
+                       .key("s3.entropy.key")
+                       .noDefaultValue()
+                       .withDescription(
+                                       "This option can be used to improve 
performance due to sharding issues on Amazon S3. " +
+                                       "For file creations with entropy 
injection, this key will be replaced by random " +
+                                       "alphanumeric characters. For other 
file creations, the key will be filtered out.");
+
+       /**
+        * The number of entropy characters, in case entropy injection is 
configured.
+        */
+       public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION 
= ConfigOptions
+                       .key("s3.entropy.length")
+                       .defaultValue(4)
+                       .withDescription(
+                                       "When '" + 
ENTROPY_INJECT_KEY_OPTION.key() + "' is set, this option defines the number of 
" +
+                                       "random characters to replace the 
entropy key with.");
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystemFactory.class);
+
        private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
                new HashSet<>(Collections.singletonList("com.amazonaws."));
 
@@ -50,8 +82,55 @@
                        { "presto.s3.secret.key", "presto.s3.secret-key" }
        };
 
-       public S3FileSystemFactory() {
-               super("Presto S3 File System", createHadoopConfigLoader());
+       private static final String INVALID_ENTROPY_KEY_CHARS = 
"^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$";
+
+       private final HadoopConfigLoader hadoopConfigLoader = 
createHadoopConfigLoader();
+
+       private Configuration flinkConfig;
+
+       @Override
+       public void configure(Configuration config) {
+               flinkConfig = config;
+               hadoopConfigLoader.setFlinkConfig(config);
+       }
+
+       @Override
+       public FileSystem create(URI fsUri) throws IOException {
+               LOG.debug("Creating S3 FileSystem backed by Presto S3 
FileSystem");
+               LOG.debug("Loading Hadoop configuration for Presto S3 File 
System");
+
+               try {
+                       // instantiate the presto file system
+                       org.apache.hadoop.conf.Configuration hadoopConfig = 
hadoopConfigLoader.getOrLoadHadoopConfig();
+                       org.apache.hadoop.fs.FileSystem fs = new 
PrestoS3FileSystem();
+                       fs.initialize(getInitURI(fsUri, hadoopConfig), 
hadoopConfig);
+
+                       // load the entropy injection settings
+                       String entropyInjectionKey = 
flinkConfig.getString(ENTROPY_INJECT_KEY_OPTION);
+                       int numEntropyChars = -1;
+
+                       if (entropyInjectionKey != null) {
+                               if 
(entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) {
+                                       throw new 
IllegalConfigurationException("Invalid character in value for " +
+                                                       
ENTROPY_INJECT_KEY_OPTION.key() + " : " + entropyInjectionKey);
+                               }
+
+                               numEntropyChars = 
flinkConfig.getInteger(ENTROPY_INJECT_LENGTH_OPTION);
+
+                               if (numEntropyChars <= 0) {
+                                       throw new IllegalConfigurationException(
+                                                       
ENTROPY_INJECT_LENGTH_OPTION.key() + " must configure a value > 0");
+                               }
+                       }
+
+                       return new S3PrestoFileSystem(fs, entropyInjectionKey, 
numEntropyChars);
+               }
+               catch (IOException e) {
+                       throw e;
+               }
+               catch (Exception e) {
+                       throw new IOException(e.getMessage(), e);
+               }
        }
 
        @Override
@@ -65,13 +144,7 @@ static HadoopConfigLoader createHadoopConfigLoader() {
                        "presto.s3.", PACKAGE_PREFIXES_TO_SHADE, 
CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
        }
 
-       @Override
-       protected org.apache.hadoop.fs.FileSystem createHadoopFileSystem() {
-               return new PrestoS3FileSystem();
-       }
-
-       @Override
-       protected URI getInitURI(URI fsUri, 
org.apache.hadoop.conf.Configuration hadoopConfig) {
+       static URI getInitURI(URI fsUri, org.apache.hadoop.conf.Configuration 
hadoopConfig) {
                final String scheme = fsUri.getScheme();
                final String authority = fsUri.getAuthority();
                final URI initUri;
@@ -88,10 +161,11 @@ else if (scheme != null && authority == null) {
                return initUri;
        }
 
-       private URI createURI(String str) {
+       static URI createURI(String str) {
                try {
                        return new URI(str);
-               } catch (URISyntaxException e) {
+               }
+               catch (URISyntaxException e) {
                        throw new FlinkRuntimeException("Error in s3 aws URI - 
" + str, e);
                }
        }
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
new file mode 100644
index 00000000000..e6a6ae4990c
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3PrestoFileSystem.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A Flink FileSystem against S3, wrapping the Presto Hadoop S3 File System 
implementation.
+ *
+ * <p>This class bases heavily on the {@link 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem} class.
+ * Code is copied here for the sake of minimal changes to the original class 
within a minor release.
+ */
+class S3PrestoFileSystem extends FileSystem {
+
+       /** The wrapped Hadoop File System. */
+       private final org.apache.hadoop.fs.FileSystem fs;
+
+       @Nullable
+       private final String entropyInjectionKey;
+
+       private final int entropyLength;
+
+       /**
+        * Wraps the given Hadoop File System object as a Flink File System 
object.
+        * The given Hadoop file system object is expected to be initialized 
already.
+        *
+        * @param hadoopFileSystem The Hadoop FileSystem that will be used 
under the hood.
+        */
+       public S3PrestoFileSystem(org.apache.hadoop.fs.FileSystem 
hadoopFileSystem) {
+               this(hadoopFileSystem, null, -1);
+       }
+
+       /**
+        * Wraps the given Hadoop File System object as a Flink File System 
object.
+        * The given Hadoop file system object is expected to be initialized 
already.
+        *
+        * <p>This constructor additionally configures the entropy injection 
for the file system.
+        *
+        * @param hadoopFileSystem The Hadoop FileSystem that will be used 
under the hood.
+        * @param entropyInjectionKey The substring that will be replaced by 
entropy or removed.
+        * @param entropyLength The number of random alphanumeric characters to 
inject as entropy.
+        */
+       public S3PrestoFileSystem(
+                       org.apache.hadoop.fs.FileSystem hadoopFileSystem,
+                       @Nullable String entropyInjectionKey,
+                       int entropyLength) {
+
+               if (entropyInjectionKey != null && entropyLength <= 0) {
+                       throw new IllegalArgumentException("Entropy length must 
be >= 0 when entropy injection key is set");
+               }
+
+               this.fs = checkNotNull(hadoopFileSystem, "hadoopFileSystem");
+               this.entropyInjectionKey = entropyInjectionKey;
+               this.entropyLength = entropyLength;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  properties
+       // 
------------------------------------------------------------------------
+
+       public org.apache.hadoop.fs.FileSystem getHadoopFileSystem() {
+               return fs;
+       }
+
+       @Nullable
+       public String getEntropyInjectionKey() {
+               return entropyInjectionKey;
+       }
+
+       public int getEntropyLength() {
+               return entropyLength;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  file system methods
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public Path getWorkingDirectory() {
+               return new Path(fs.getWorkingDirectory().toUri());
+       }
+
+       public Path getHomeDirectory() {
+               return new Path(fs.getHomeDirectory().toUri());
+       }
+
+       @Override
+       public URI getUri() {
+               return fs.getUri();
+       }
+
+       @Override
+       public FileStatus getFileStatus(final Path f) throws IOException {
+               org.apache.hadoop.fs.FileStatus status = 
fs.getFileStatus(toHadoopPath(f));
+               return new HadoopFileStatus(status);
+       }
+
+       @Override
+       public BlockLocation[] getFileBlockLocations(
+                       final FileStatus file,
+                       final long start,
+                       final long len) throws IOException {
+
+               if (!(file instanceof HadoopFileStatus)) {
+                       throw new IOException("file is not an instance of 
HadoopFileStatus");
+               }
+
+               final HadoopFileStatus f = (HadoopFileStatus) file;
+
+               final org.apache.hadoop.fs.BlockLocation[] blkLocations = 
fs.getFileBlockLocations(f.getInternalFileStatus(),
+                               start, len);
+
+               // Wrap up HDFS specific block location objects
+               final HadoopBlockLocation[] distBlkLocations = new 
HadoopBlockLocation[blkLocations.length];
+               for (int i = 0; i < distBlkLocations.length; i++) {
+                       distBlkLocations[i] = new 
HadoopBlockLocation(blkLocations[i]);
+               }
+
+               return distBlkLocations;
+       }
+
+       @Override
+       public HadoopDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
+               final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+               final org.apache.hadoop.fs.FSDataInputStream fdis = 
this.fs.open(path, bufferSize);
+               return new HadoopDataInputStream(fdis);
+       }
+
+       @Override
+       public HadoopDataInputStream open(final Path f) throws IOException {
+               final org.apache.hadoop.fs.Path path = toHadoopPath(f);
+               final org.apache.hadoop.fs.FSDataInputStream fdis = 
fs.open(path);
+               return new HadoopDataInputStream(fdis);
+       }
+
+       @Override
+       public HadoopDataOutputStream create(final Path f, final WriteMode 
overwrite) throws IOException {
+               final org.apache.hadoop.fs.FSDataOutputStream 
fsDataOutputStream =
+                               fs.create(toHadoopPath(f), overwrite == 
WriteMode.OVERWRITE);
+               return new HadoopDataOutputStream(fsDataOutputStream);
+       }
+
+       @Override
+       public FSDataOutputStream create(final Path f, final WriteOptions 
options) throws IOException {
+               final org.apache.hadoop.fs.Path path = options.isInjectEntropy()
+                               ? toHadoopPathInjectEntropy(f)
+                               : toHadoopPath(f);
+
+               final org.apache.hadoop.fs.FSDataOutputStream 
fsDataOutputStream = fs.create(
+                               path, options.getOverwrite() == 
WriteMode.OVERWRITE);
+
+               return new HadoopDataOutputStream(fsDataOutputStream);
+       }
+
+       @Override
+       public boolean delete(final Path f, final boolean recursive) throws 
IOException {
+               return fs.delete(toHadoopPath(f), recursive);
+       }
+
+       @Override
+       public boolean exists(Path f) throws IOException {
+               return fs.exists(toHadoopPath(f));
+       }
+
+       @Override
+       public FileStatus[] listStatus(final Path f) throws IOException {
+               final org.apache.hadoop.fs.FileStatus[] hadoopFiles = 
fs.listStatus(toHadoopPath(f));
+               final FileStatus[] files = new FileStatus[hadoopFiles.length];
+
+               // Convert types
+               for (int i = 0; i < files.length; i++) {
+                       files[i] = new HadoopFileStatus(hadoopFiles[i]);
+               }
+
+               return files;
+       }
+
+       @Override
+       public boolean mkdirs(final Path f) throws IOException {
+               return fs.mkdirs(toHadoopPath(f));
+       }
+
+       @Override
+       public boolean rename(final Path src, final Path dst) throws 
IOException {
+               return fs.rename(toHadoopPath(src), toHadoopPath(dst));
+       }
+
+       @SuppressWarnings("deprecation")
+       @Override
+       public long getDefaultBlockSize() {
+               return fs.getDefaultBlockSize();
+       }
+
+       @Override
+       public boolean isDistributedFS() {
+               return true;
+       }
+
+       @Override
+       public FileSystemKind getKind() {
+               return FileSystemKind.OBJECT_STORE;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  entropy utilities
+       // 
------------------------------------------------------------------------
+
+       @VisibleForTesting
+       org.apache.hadoop.fs.Path toHadoopPath(Path path) throws IOException {
+               return rewritePathForEntropyKey(path, false);
+       }
+
+       @VisibleForTesting
+       org.apache.hadoop.fs.Path toHadoopPathInjectEntropy(Path path) throws 
IOException {
+               return rewritePathForEntropyKey(path, true);
+       }
+
+       private org.apache.hadoop.fs.Path rewritePathForEntropyKey(Path path, 
boolean addEntropy) throws IOException {
+               if (entropyInjectionKey == null) {
+                       return convertToHadoopPath(path);
+               }
+               else {
+                       final URI originalUri = path.toUri();
+                       final String checkpointPath = originalUri.getPath();
+
+                       final int indexOfKey = 
checkpointPath.indexOf(entropyInjectionKey);
+                       if (indexOfKey == -1) {
+                               return convertToHadoopPath(path);
+                       }
+                       else {
+                               final StringBuilder buffer = new 
StringBuilder(checkpointPath.length());
+                               buffer.append(checkpointPath, 0, indexOfKey);
+
+                               if (addEntropy) {
+                                       
StringUtils.appendRandomAlphanumericString(ThreadLocalRandom.current(), buffer, 
entropyLength);
+                               }
+
+                               buffer.append(checkpointPath, indexOfKey + 
entropyInjectionKey.length(), checkpointPath.length());
+
+                               final String rewrittenPath = buffer.toString();
+                               try {
+                                       return convertToHadoopPath(new URI(
+                                                       originalUri.getScheme(),
+                                                       
originalUri.getAuthority(),
+                                                       rewrittenPath,
+                                                       originalUri.getQuery(),
+                                                       
originalUri.getFragment()));
+                               }
+                               catch (URISyntaxException e) {
+                                       // this should actually never happen, 
because the URI was valid before
+                                       throw new IOException("URI format error 
while processing path for entropy injection", e);
+                               }
+                       }
+               }
+       }
+
+       private static org.apache.hadoop.fs.Path convertToHadoopPath(URI uri) {
+               return new org.apache.hadoop.fs.Path(uri);
+       }
+
+       private static org.apache.hadoop.fs.Path convertToHadoopPath(Path path) 
{
+               return convertToHadoopPath(path.toUri());
+       }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
 
b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
new file mode 100644
index 00000000000..587b02e4010
--- /dev/null
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemEntropyTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3presto;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the entropy injection in the {@link S3PrestoFileSystem}.
+ */
+public class PrestoS3FileSystemEntropyTest {
+
+       @Test
+       public void testEmptyPath() throws Exception {
+               Path path = new Path("hdfs://localhost:12345");
+               S3PrestoFileSystem fs = createFs("test", 4);
+
+               assertEquals(toHadoopPath(path), 
fs.toHadoopPathInjectEntropy(path));
+               assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+       }
+
+       @Test
+       public void testFullUriNonMatching() throws Exception {
+               Path path = new 
Path("s3://hugo@myawesomehost:55522/path/to/the/file");
+               S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+               assertEquals(toHadoopPath(path), 
fs.toHadoopPathInjectEntropy(path));
+               assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+       }
+
+       @Test
+       public void testFullUriMatching() throws Exception {
+               Path path = new 
Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+               S3PrestoFileSystem fs = createFs("s0mek3y", 8);
+
+               org.apache.hadoop.fs.Path withEntropy = 
fs.toHadoopPathInjectEntropy(path);
+               org.apache.hadoop.fs.Path withoutEntropy = 
fs.toHadoopPath(path);
+
+               validateMatches(withEntropy, 
"s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{8}/the/file");
+               assertEquals(new 
org.apache.hadoop.fs.Path("s3://hugo@myawesomehost:55522/path/the/file"), 
withoutEntropy);
+       }
+
+       @Test
+       public void testPathOnlyNonMatching() throws Exception {
+               Path path = new Path("/path/file");
+               S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+               assertEquals(toHadoopPath(path), 
fs.toHadoopPathInjectEntropy(path));
+               assertEquals(toHadoopPath(path), fs.toHadoopPath(path));
+       }
+
+       @Test
+       public void testPathOnlyMatching() throws Exception {
+               Path path = new Path("/path/_entropy_key_/file");
+               S3PrestoFileSystem fs = createFs("_entropy_key_", 4);
+
+               org.apache.hadoop.fs.Path withEntropy = 
fs.toHadoopPathInjectEntropy(path);
+               org.apache.hadoop.fs.Path withoutEntropy = 
fs.toHadoopPath(path);
+
+               validateMatches(withEntropy, "/path/[a-zA-Z0-9]{4}/file");
+               assertEquals(new org.apache.hadoop.fs.Path("/path/file"), 
withoutEntropy);
+       }
+
+       @Test
+       public void testEntropyNotFullSegment() throws Exception {
+               Path path = new 
Path("s3://myhost:122/entropy-_entropy_key_-suffix/file");
+               S3PrestoFileSystem fs = createFs("_entropy_key_", 3);
+
+               org.apache.hadoop.fs.Path withEntropy = 
fs.toHadoopPathInjectEntropy(path);
+               org.apache.hadoop.fs.Path withoutEntropy = 
fs.toHadoopPath(path);
+
+               validateMatches(withEntropy, 
"s3://myhost:122/entropy-[a-zA-Z0-9]{3}-suffix/file");
+               assertEquals(new 
org.apache.hadoop.fs.Path("s3://myhost:122/entropy--suffix/file"), 
withoutEntropy);
+       }
+
+       @Test
+       public void testWriteOptionWithEntropy() throws Exception {
+               FileSystem underlyingFs = mock(FileSystem.class);
+               when(underlyingFs.create(any(org.apache.hadoop.fs.Path.class), 
anyBoolean())).thenReturn(mock(FSDataOutputStream.class));
+               ArgumentCaptor<org.apache.hadoop.fs.Path> pathCaptor = 
ArgumentCaptor.forClass(org.apache.hadoop.fs.Path.class);
+
+               Path path = new 
Path("s3://hugo@myawesomehost:55522/path/s0mek3y/the/file");
+               S3PrestoFileSystem fs = new S3PrestoFileSystem(underlyingFs, 
"s0mek3y", 11);
+
+               fs.create(path, new WriteOptions().setInjectEntropy(true));
+               verify(underlyingFs).create(pathCaptor.capture(), anyBoolean());
+
+               validateMatches(pathCaptor.getValue(), 
"s3://hugo@myawesomehost:55522/path/[a-zA-Z0-9]{11}/the/file");
+       }
+
+       private static void validateMatches(org.apache.hadoop.fs.Path path, 
String pattern) {
+               if (!path.toString().matches(pattern)) {
+                       fail("Path " + path + " does not match " + pattern);
+               }
+       }
+
+       private static S3PrestoFileSystem createFs(String entropyKey, int 
entropyLen) {
+               return new S3PrestoFileSystem(mock(FileSystem.class), 
entropyKey, entropyLen);
+       }
+
+       private org.apache.hadoop.fs.Path toHadoopPath(Path path) {
+               return new org.apache.hadoop.fs.Path(path.toUri());
+       }
+}
diff --git 
a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
 
b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 4eeb2d4512a..9afad5742d3 100644
--- 
a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ 
b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,12 +21,12 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.fs.hdfs.HadoopConfigLoader;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSStaticCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.facebook.presto.hive.PrestoS3FileSystem;
+import org.junit.After;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -41,6 +41,11 @@
  */
 public class PrestoS3FileSystemTest {
 
+       @After
+       public void resetFileSystemConfig() throws Exception {
+               FileSystem.initialize(new Configuration());
+       }
+
        @Test
        public void testConfigPropagation() throws Exception{
                final Configuration conf = new Configuration();
@@ -90,14 +95,29 @@ public void testShadingOfAwsCredProviderConfig() {
                        hadoopConfig.get("presto.s3.credentials-provider"));
        }
 
+       @Test
+       public void testEntropyInjectionConfig() throws Exception {
+               final Configuration conf = new Configuration();
+               conf.setString("s3.entropy.key", "__entropy__");
+               conf.setInteger("s3.entropy.length", 7);
+
+               FileSystem.initialize(conf);
+
+               FileSystem fs = FileSystem.get(new URI("s3://test"));
+               S3PrestoFileSystem s3fs = (S3PrestoFileSystem) fs;
+
+               assertEquals("__entropy__", s3fs.getEntropyInjectionKey());
+               assertEquals(7, s3fs.getEntropyLength());
+       }
+
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------
 
        private static void validateBasicCredentials(FileSystem fs) throws 
Exception {
-               assertTrue(fs instanceof HadoopFileSystem);
+               assertTrue(fs instanceof S3PrestoFileSystem);
 
-               org.apache.hadoop.fs.FileSystem hadoopFs = ((HadoopFileSystem) 
fs).getHadoopFileSystem();
+               org.apache.hadoop.fs.FileSystem hadoopFs = 
((S3PrestoFileSystem) fs).getHadoopFileSystem();
                assertTrue(hadoopFs instanceof PrestoS3FileSystem);
 
                try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) 
hadoopFs) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
index 054a98cc511..ec9f0c223f0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileBasedStateOutputStream.java
@@ -28,7 +28,6 @@
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.io.IOException;
 
@@ -38,7 +37,7 @@
  * A {@link CheckpointStateOutputStream} that writes into a specified file and
  * returns a {@link FileStateHandle} upon closing.
  *
- * <p>Unlike the {@link 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
+ * <p>Unlike the {@link 
FsCheckpointStreamFactory.FsCheckpointStateOutputStream},
  * this stream does not have a threshold below which it returns a memory byte 
stream handle,
  * and does not create random files, but writes to a specified file.
  */
@@ -116,7 +115,6 @@ public void close() {
                }
        }
 
-       @Nullable
        @Override
        public FileStateHandle closeAndGetHandle() throws IOException {
                synchronized (this) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index af80af7ac7f..1549e010fba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -32,6 +32,7 @@
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * An implementation of durable checkpoint storage to file systems.
@@ -54,11 +55,25 @@ public FsCheckpointStorage(
                        JobID jobId,
                        int fileSizeThreshold) throws IOException {
 
+               this(checkpointBaseDirectory.getFileSystem(),
+                               checkpointBaseDirectory,
+                               defaultSavepointDirectory,
+                               jobId,
+                               fileSizeThreshold);
+       }
+
+       public FsCheckpointStorage(
+                       FileSystem fs,
+                       Path checkpointBaseDirectory,
+                       @Nullable Path defaultSavepointDirectory,
+                       JobID jobId,
+                       int fileSizeThreshold) throws IOException {
+
                super(jobId, defaultSavepointDirectory);
 
                checkArgument(fileSizeThreshold >= 0);
 
-               this.fileSystem = checkpointBaseDirectory.getFileSystem();
+               this.fileSystem = checkNotNull(fs);
                this.checkpointsDirectory = 
getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
                this.sharedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_SHARED_STATE_DIR);
                this.taskOwnedStateDirectory = new Path(checkpointsDirectory, 
CHECKPOINT_TASK_OWNED_STATE_DIR);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 609ef698530..fd4e2b60720 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -21,6 +21,7 @@
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -57,19 +58,19 @@
  * <p>For example many S3 file systems (like Hadoop's s3a) use HTTP HEAD 
requests to check
  * for the existence of a directory. S3 sometimes limits the number of HTTP 
HEAD requests to
  * a few hundred per second only. Those numbers are easily reached by 
moderately large setups.
- * Surprisingly (and fortunately), the actual state writing (POST) have much 
higher quotas.
+ * Surprisingly (and fortunately), the actual state writing (PUT) have much 
higher quotas.
  */
 public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(FsCheckpointStreamFactory.class);
 
-       /** Maximum size of state that is stored with the metadata, rather than 
in files */
+       /** Maximum size of state that is stored with the metadata, rather than 
in files. */
        public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
 
-       /** Default size for the write buffer */
+       /** Default size for the write buffer. */
        public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
 
-       /** State below this size will be stored as part of the metadata, 
rather than in files */
+       /** State below this size will be stored as part of the metadata, 
rather than in files. */
        private final int fileStateThreshold;
 
        /** The directory for checkpoint exclusive state data. */
@@ -118,9 +119,7 @@ public FsCheckpointStreamFactory(
 
        @Override
        public FsCheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
IOException {
-
-
-               Path target = scope == CheckpointedStateScope.EXCLUSIVE 
?checkpointDirectory: sharedStateDirectory;
+               Path target = scope == CheckpointedStateScope.EXCLUSIVE ? 
checkpointDirectory : sharedStateDirectory;
                int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, 
fileStateThreshold);
 
                return new FsCheckpointStateOutputStream(target, filesystem, 
bufferSize, fileStateThreshold);
@@ -151,7 +150,7 @@ public String toString() {
                private int pos;
 
                private FSDataOutputStream outStream;
-               
+
                private final int localStateThreshold;
 
                private final Path basePath;
@@ -164,8 +163,8 @@ public String toString() {
 
                public FsCheckpointStateOutputStream(
                                        Path basePath, FileSystem fs,
-                                       int bufferSize, int localStateThreshold)
-               {
+                                       int bufferSize, int 
localStateThreshold) {
+
                        if (bufferSize < localStateThreshold) {
                                throw new IllegalArgumentException();
                        }
@@ -199,7 +198,7 @@ public void write(byte[] b, int off, int len) throws 
IOException {
                                        // flush the write buffer to make it 
clear again
                                        flush();
                                }
-                               
+
                                // copy what is in the buffer
                                System.arraycopy(b, off, writeBuffer, pos, len);
                                pos += len;
@@ -300,7 +299,7 @@ public StreamStateHandle closeAndGetHandle() throws 
IOException {
                                                        flush();
 
                                                        pos = 
writeBuffer.length;
-                                               
+
                                                        long size = -1L;
 
                                                        // make a best effort 
attempt to figure out the size
@@ -345,7 +344,11 @@ private void createStream() throws IOException {
                        for (int attempt = 0; attempt < 10; attempt++) {
                                try {
                                        Path statePath = createStatePath();
-                                       FSDataOutputStream outStream = 
fs.create(statePath, FileSystem.WriteMode.NO_OVERWRITE);
+                                       FSDataOutputStream outStream = 
fs.create(
+                                                       statePath,
+                                                       new WriteOptions()
+                                                                       
.setOverwrite(FileSystem.WriteMode.NO_OVERWRITE)
+                                                                       
.setInjectEntropy(true));
 
                                        // success, managed to open the stream
                                        this.statePath = statePath;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
index 8bafdf729af..c9621463cc7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -138,13 +139,13 @@ public void testGetPos() throws Exception {
         */
        @Test
        public void testCleanupWhenClosingStream() throws IOException {
-
                final FileSystem fs = mock(FileSystem.class);
                final FSDataOutputStream outputStream = 
mock(FSDataOutputStream.class);
 
                final ArgumentCaptor<Path> pathCaptor = 
ArgumentCaptor.forClass(Path.class);
 
                when(fs.create(pathCaptor.capture(), 
any(FileSystem.WriteMode.class))).thenReturn(outputStream);
+               when(fs.create(pathCaptor.capture(), 
any(WriteOptions.class))).thenReturn(outputStream);
 
                CheckpointStreamFactory.CheckpointStateOutputStream stream = 
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
                        Path.fromLocalFile(tempDir.newFolder()),
@@ -154,9 +155,6 @@ public void testCleanupWhenClosingStream() throws 
IOException {
 
                // this should create the underlying file stream
                stream.write(new byte[] {1, 2, 3, 4, 5});
-
-               verify(fs).create(any(Path.class), 
any(FileSystem.WriteMode.class));
-
                stream.close();
 
                verify(fs).delete(eq(pathCaptor.getValue()), anyBoolean());
@@ -170,9 +168,11 @@ public void testCleanupWhenFailingCloseAndGetHandle() 
throws IOException {
                final FileSystem fs = mock(FileSystem.class);
                final FSDataOutputStream outputStream = 
mock(FSDataOutputStream.class);
 
-               final ArgumentCaptor<Path>  pathCaptor = 
ArgumentCaptor.forClass(Path.class);
+               final ArgumentCaptor<Path> pathCaptor = 
ArgumentCaptor.forClass(Path.class);
 
                when(fs.create(pathCaptor.capture(), 
any(FileSystem.WriteMode.class))).thenReturn(outputStream);
+               when(fs.create(pathCaptor.capture(), 
any(WriteOptions.class))).thenReturn(outputStream);
+
                doThrow(new IOException("Test 
IOException.")).when(outputStream).close();
 
                CheckpointStreamFactory.CheckpointStateOutputStream stream = 
new FsCheckpointStreamFactory.FsCheckpointStateOutputStream(
@@ -184,8 +184,6 @@ public void testCleanupWhenFailingCloseAndGetHandle() 
throws IOException {
                // this should create the underlying file stream
                stream.write(new byte[] {1, 2, 3, 4, 5});
 
-               verify(fs).create(any(Path.class), 
any(FileSystem.WriteMode.class));
-
                try {
                        stream.closeAndGetHandle();
                        fail("Expected IOException");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
new file mode 100644
index 00000000000..d1aa11844dd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.WriteOptions;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.ArgumentCaptor;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests verifying that the FsStateBackend passes the entropy injection option
+ * to the FileSystem for state payload files, but not for metadata files.
+ */
+public class FsStateBackendEntropyTest {
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       @After
+       public void resetFileSystems() throws Exception {
+               FileSystem.initialize(new Configuration());
+       }
+
+       @Test
+       public void testInjection() throws Exception {
+               FileSystem fs = spy(LocalFileSystem.getSharedInstance());
+               ArgumentCaptor<WriteOptions> optionsCaptor = 
ArgumentCaptor.forClass(WriteOptions.class);
+
+               Path checkpointDir = Path.fromLocalFile(tmp.newFolder());
+
+               FsCheckpointStorage storage = new FsCheckpointStorage(
+                               fs, checkpointDir, null, new JobID(), 1024);
+
+               CheckpointStorageLocation location = 
storage.initializeLocationForCheckpoint(96562);
+
+               // check entropy in task-owned state
+               try (CheckpointStateOutputStream stream = 
storage.createTaskOwnedStateStream()) {
+                       stream.flush();
+               }
+
+               verify(fs, times(1)).create(any(Path.class), 
optionsCaptor.capture());
+               assertTrue(optionsCaptor.getValue().isInjectEntropy());
+               reset(fs);
+
+               // check entropy in the exclusive/shared state
+               try (CheckpointStateOutputStream stream =
+                               
location.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE)) {
+
+                       stream.flush();
+               }
+
+               verify(fs, times(1)).create(any(Path.class), 
optionsCaptor.capture());
+               assertTrue(optionsCaptor.getValue().isInjectEntropy());
+               reset(fs);
+
+               // check that there is no entropy in the metadata
+               // check entropy in the exclusive/shared state
+               try (CheckpointMetadataOutputStream stream = 
location.createMetadataOutputStream()) {
+                       stream.flush();
+               }
+
+               verify(fs, times(0)).create(any(Path.class), 
any(WriteOptions.class));
+       }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> add entropy to s3 path for better scalability
> ---------------------------------------------
>
>                 Key: FLINK-9061
>                 URL: https://issues.apache.org/jira/browse/FLINK-9061
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystem, State Backends, Checkpointing
>    Affects Versions: 1.5.0, 1.4.2
>            Reporter: Jamie Grier
>            Assignee: Indrajit Roychoudhury
>            Priority: Critical
>              Labels: pull-request-available
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to