[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675258#comment-16675258
 ] 

ASF GitHub Bot commented on FLINK-9061:
---------------------------------------

GJL closed pull request #6302:  [FLINK-9061][checkpointing] add entropy to s3 
path for better scalability
URL: https://github.com/apache/flink/pull/6302
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6ec7ec836c0..06d007dbbb9 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -179,6 +179,26 @@ public RocksDBStateBackend(URI checkpointDataUri, boolean 
enableIncrementalCheck
                this(new FsStateBackend(checkpointDataUri), 
enableIncrementalCheckpointing);
        }
 
+       /**
+        * Creates a new entropy based {@code RocksDBStateBackend} that stores 
its checkpoint data in the
+        * file system and location defined by the given URI.
+        *
+        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
+        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
+        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
+        * config, or included in the classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
+        * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
+        * @param entropyInjectionKey The string that specifies entropy key in 
the checkpoint uri
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        */
+       public RocksDBStateBackend(URI checkpointDataUri, boolean 
enableIncrementalCheckpointing, String entropyInjectionKey) throws IOException {
+               this(new FsStateBackend(checkpointDataUri, entropyInjectionKey),
+                       enableIncrementalCheckpointing);
+       }
+
        /**
         * Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
         * checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
@@ -209,6 +229,7 @@ public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend, boolean
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
        }
 
+
        // 
------------------------------------------------------------------------
        //  State backend methods
        // 
------------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
index f0569b8d61e..be8999e6e2b 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
@@ -44,12 +44,16 @@
        /** The key under which the config stores the directory where RocksDB 
should be stored. */
        public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.rocksdb.checkpointdir";
 
+       /** The key defines the pattern in checkpoint uri where entropy value 
should be injected or substituted. */
+       public static final String CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY = 
"state.backend.fs.checkpointdir.injectEntropy.key";
+
        @Override
        public RocksDBStateBackend createFromConfig(Configuration config)
                        throws IllegalConfigurationException, IOException {
 
                final String checkpointDirURI = 
config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
                final String rocksdbLocalPath = 
config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null);
+               final String entropyInjectionKey = 
config.getString(CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY, "__ENTROPY_KEY__");
 
                if (checkpointDirURI == null) {
                        throw new IllegalConfigurationException(
@@ -59,7 +63,8 @@ public RocksDBStateBackend createFromConfig(Configuration 
config)
 
                try {
                        Path path = new Path(checkpointDirURI);
-                       RocksDBStateBackend backend = new 
RocksDBStateBackend(path.toUri());
+                       RocksDBStateBackend backend = new 
RocksDBStateBackend(path.toUri(),
+                               true, entropyInjectionKey);
                        if (rocksdbLocalPath != null) {
                                String[] directories = 
rocksdbLocalPath.split(",|" + File.pathSeparator);
                                backend.setDbStoragePaths(directories);
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index a8c583bbcd6..bf078d4271c 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -102,7 +102,7 @@ under the License.
                        <groupId>org.apache.commons</groupId>
                        <artifactId>commons-lang3</artifactId>
                </dependency>
-               
+
                <dependency>
                        <groupId>commons-cli</groupId>
                        <artifactId>commons-cli</artifactId>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index a8246518a65..2d6d470c8c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import java.net.URISyntaxException;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
@@ -63,6 +66,9 @@
        /** Cached handle to the file system for file operations */
        private final FileSystem filesystem;
 
+       /** Default random entropy string length*/
+       private static final int DEFAULT_RANDOM_ENTROPY_LENGTH = 4;
+
        /**
         * Creates a new state backend that stores its checkpoint data in the 
file system and location
         * defined by the given URI.
@@ -84,7 +90,8 @@
        public FsCheckpointStreamFactory(
                        Path checkpointDataUri,
                        JobID jobId,
-                       int fileStateSizeThreshold) throws IOException {
+                       int fileStateSizeThreshold,
+                       String entropyInjectionKey) throws IOException {
 
                if (fileStateSizeThreshold < 0) {
                        throw new IllegalArgumentException("The threshold for 
file state size must be zero or larger.");
@@ -98,7 +105,7 @@ public FsCheckpointStreamFactory(
                Path basePath = checkpointDataUri;
                filesystem = basePath.getFileSystem();
 
-               checkpointDirectory = createBasePath(filesystem, basePath, 
jobId);
+               checkpointDirectory = createBasePath(filesystem, basePath, 
jobId, entropyInjectionKey);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Initialed file stream factory to URI {}.", 
checkpointDirectory);
@@ -127,12 +134,47 @@ private void checkFileSystemInitialized() throws 
IllegalStateException {
                }
        }
 
-       protected Path createBasePath(FileSystem fs, Path checkpointDirectory, 
JobID jobID) throws IOException {
-               Path dir = new Path(checkpointDirectory, jobID.toString());
+       protected Path createBasePath(FileSystem fs, Path checkpointDirectory, 
JobID jobID,
+                                                                String 
entropyInjectionKey) throws IOException {
+               Path checkpointDirectoryWithEntropy;
+               try {
+                       checkpointDirectoryWithEntropy = 
injectEntropy(checkpointDirectory, entropyInjectionKey);
+               }
+               catch(URISyntaxException ex) {
+                       throw new IOException("URI error occurred while 
injecting entropy to checkpoint path.", ex);
+               }
+               Path dir = new Path(checkpointDirectoryWithEntropy, 
jobID.toString());
                fs.mkdirs(dir);
                return dir;
        }
 
+       protected Path injectEntropy(Path basePath, String entropyInjectionKey)
+               throws URISyntaxException {
+
+               final URI originalUri = basePath.toUri();
+               String chkpointPath = originalUri.getPath();
+               String retPath = chkpointPath;
+
+               if (!entropyInjectionKey.isEmpty()) {
+                       if (chkpointPath.contains(entropyInjectionKey)) {
+                               final String randomChars = 
RandomStringUtils.randomAlphanumeric(DEFAULT_RANDOM_ENTROPY_LENGTH);
+                               retPath = 
chkpointPath.replaceAll(entropyInjectionKey, randomChars);
+                               LOG.debug("Entropy injected checkpoint path 
{}", retPath);
+                       } else {
+                               LOG.warn("Entropy inject key is non-empty: {} , 
however key is not found in checkpoint path : {}", entropyInjectionKey, 
chkpointPath);
+                       }
+               }
+               URI stateUri = new URI(originalUri.getScheme(), 
originalUri.getAuthority(),
+                       retPath, originalUri.getQuery(), 
originalUri.getFragment());
+
+               return new Path(stateUri);
+       }
+
+       @VisibleForTesting
+       public Path getCheckpointDirectory() {
+               return checkpointDirectory;
+       }
+
        protected Path createCheckpointDirPath(Path checkpointDirectory, long 
checkpointID) {
                return new Path(checkpointDirectory, "chk-" + checkpointID);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
index 7410d2dd971..2b9da7ba1dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
@@ -39,11 +39,12 @@ public FsSavepointStreamFactory(
                        JobID jobId,
                        int fileStateSizeThreshold) throws IOException {
 
-               super(checkpointDataUri, jobId, fileStateSizeThreshold);
+               super(checkpointDataUri, jobId, fileStateSizeThreshold,"");
        }
 
        @Override
-       protected Path createBasePath(FileSystem fs, Path checkpointDirectory, 
JobID jobID) throws IOException {
+       protected Path createBasePath(FileSystem fs, Path checkpointDirectory, 
JobID jobID,
+                                                                 String 
entropyInjectionKey) throws IOException {
                // No checkpoint specific directory required as the savepoint 
directory
                // is already unique.
                return checkpointDirectory;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 952988f72d5..9d7d3ce3070 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import java.util.regex.Pattern;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.FileSystem;
@@ -67,6 +68,12 @@
        /** Switch to chose between synchronous and asynchronous snapshots */
        private final boolean asynchronousSnapshots;
 
+       /** invalid chars for the entropy key */
+       private static final Pattern INVALID_ENTROPY_KEY_CHARS_PATTERN = 
Pattern.compile("^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$");
+
+       /** Entropy injection key */
+       private final String entropyInjectionKey;
+
        /**
         * Creates a new state backend that stores its checkpoint data in the 
file system and location
         * defined by the given URI.
@@ -211,6 +218,67 @@ public FsStateBackend(URI checkpointDataUri, int 
fileStateSizeThreshold) throws
                this(checkpointDataUri, fileStateSizeThreshold, true);
        }
 
+       /**
+        * Creates a new entropy based state backend that stores its checkpoint 
data in the file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @param fileStateSizeThreshold State up to this size will be stored 
as part of the metadata,
+        *                             rather than in files
+        * @param entropyInjectionKey String that identifies the entropy key in 
the checkpoint uri
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        * @throws IllegalArgumentException Thrown, if the {@code 
fileStateSizeThreshold} is out of bounds.
+        */
+       public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold,
+                                                 String entropyInjectionKey) 
throws IOException {
+
+               checkArgument(fileStateSizeThreshold >= 0, "The threshold for 
file state size must be " +
+                       "zero or larger.");
+               checkArgument(fileStateSizeThreshold <= 
MAX_FILE_STATE_THRESHOLD,
+                       "The threshold for file state size cannot be larger 
than %s", MAX_FILE_STATE_THRESHOLD);
+
+               
checkArgument(!INVALID_ENTROPY_KEY_CHARS_PATTERN.matcher(entropyInjectionKey).matches());
+
+               this.fileStateThreshold = fileStateSizeThreshold;
+               this.basePath = validateAndNormalizeUri(checkpointDataUri);
+
+               this.asynchronousSnapshots = true;
+               this.entropyInjectionKey = entropyInjectionKey;
+       }
+
+       /**
+        * Creates a new entropy based state backend that stores its checkpoint 
data in the file system and location
+        * defined by the given URI.
+        *
+        * <p>A file system for the file system scheme in the URI (e.g., 
'file://', 'hdfs://', or 'S3://')
+        * must be accessible via {@link FileSystem#get(URI)}.
+        *
+        * <p>For a state backend targeting HDFS, this means that the URI must 
either specify the authority
+        * (host and port), or that the Hadoop configuration that describes 
that information must be in the
+        * classpath.
+        *
+        * @param checkpointDataUri The URI describing the filesystem (scheme 
and optionally authority),
+        *                          and the path to the checkpoint data 
directory.
+        * @param entropyInjectionKey String that identifies the entropy key in 
the checkpoint uri
+        *
+        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
+        * @throws IllegalArgumentException Thrown, if the {@code 
fileStateSizeThreshold} is out of bounds.
+        */
+
+       public FsStateBackend(URI checkpointDataUri,
+                                                 String entropyInjectionKey) 
throws IOException {
+               this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, 
entropyInjectionKey);
+       }
+
        /**
         * Creates a new state backend that stores its checkpoint data in the 
file system and location
         * defined by the given URI.
@@ -243,6 +311,7 @@ public FsStateBackend(
                this.basePath = validateAndNormalizeUri(checkpointDataUri);
 
                this.asynchronousSnapshots = asynchronousSnapshots;
+               this.entropyInjectionKey = "";
        }
 
        /**
@@ -274,7 +343,8 @@ public int getMinFileSizeThreshold() {
 
        @Override
        public CheckpointStreamFactory createStreamFactory(JobID jobId, String 
operatorIdentifier) throws IOException {
-               return new FsCheckpointStreamFactory(basePath, jobId, 
fileStateThreshold);
+               return new FsCheckpointStreamFactory(basePath, jobId, 
fileStateThreshold,
+                       entropyInjectionKey);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
index 4c933ef9543..1625a58b2aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
@@ -38,6 +38,10 @@
         * rather than in files */
        public static final String MEMORY_THRESHOLD_CONF_KEY = 
"state.backend.fs.memory-threshold";
 
+       /** The key defines the pattern in checkpoint uri where entropy value 
should be injected or substituted */
+       public static final String CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY = 
"state.backend.fs.checkpointdir.injectEntropy.key";
+
+
 
        @Override
        public FsStateBackend createFromConfig(Configuration config) throws 
IllegalConfigurationException {
@@ -45,6 +49,9 @@ public FsStateBackend createFromConfig(Configuration config) 
throws IllegalConfi
                final int memoryThreshold = config.getInteger(
                        MEMORY_THRESHOLD_CONF_KEY, 
FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD);
 
+               final String entropyInjectionKey = 
config.getString(CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY, "__ENTROPY_KEY__");
+
+
                if (checkpointDirURI == null) {
                        throw new IllegalConfigurationException(
                                        "Cannot create the file system state 
backend: The configuration does not specify the " +
@@ -53,7 +60,7 @@ public FsStateBackend createFromConfig(Configuration config) 
throws IllegalConfi
 
                try {
                        Path path = new Path(checkpointDirURI);
-                       return new FsStateBackend(path.toUri(), 
memoryThreshold);
+                       return new FsStateBackend(path.toUri(), 
memoryThreshold, entropyInjectionKey);
                }
                catch (IOException | IllegalArgumentException e) {
                        throw new IllegalConfigurationException("Invalid 
configuration for the state backend", e);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 6be23437d88..799072106ba 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.runtime.state;
 
+import java.nio.file.FileSystems;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.junit.Ignore;
@@ -161,6 +164,171 @@ public void testStateOutputStream() throws IOException {
                }
        }
 
+       @Test
+       public void testEntropyStateOutputStream() throws IOException {
+               File basePath = tempFolder.newFolder().getAbsoluteFile();
+               final String path = basePath.getAbsolutePath();
+               final String entropyKey = "_ENTROPY_";
+
+               final String pathWithEntropy = path + 
FileSystems.getDefault().getSeparator() + entropyKey;
+
+               try {
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new FsStateBackend(new 
File(pathWithEntropy).toURI(), 15, entropyKey));
+                       JobID jobId = new JobID();
+
+                       // we know how FsCheckpointStreamFactory is implemented 
so we know where it
+                       // will store checkpoints
+                       File checkpointPath = new File(pathWithEntropy, 
jobId.toString());
+
+                       CheckpointStreamFactory streamFactory = 
backend.createStreamFactory(jobId, "test_entropy");
+
+                       byte[] state1 = new byte[1274673];
+                       byte[] state2 = new byte[1];
+                       byte[] state3 = new byte[0];
+                       byte[] state4 = new byte[177];
+
+                       Random rnd = new Random();
+                       rnd.nextBytes(state1);
+                       rnd.nextBytes(state2);
+                       rnd.nextBytes(state3);
+                       rnd.nextBytes(state4);
+
+                       long checkpointId = 97231523453L;
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream1 =
+                               
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream2 =
+                               
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream3 =
+                               
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+
+                       stream1.write(state1);
+                       stream2.write(state2);
+                       stream3.write(state3);
+
+                       FileStateHandle handle1 = (FileStateHandle) 
stream1.closeAndGetHandle();
+                       ByteStreamStateHandle handle2 = (ByteStreamStateHandle) 
stream2.closeAndGetHandle();
+                       ByteStreamStateHandle handle3 = (ByteStreamStateHandle) 
stream3.closeAndGetHandle();
+
+                       // use with try-with-resources
+                       StreamStateHandle handle4;
+                       try 
(CheckpointStreamFactory.CheckpointStateOutputStream stream4 =
+                                        
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
+                               stream4.write(state4);
+                               handle4 = stream4.closeAndGetHandle();
+                       }
+
+                       // close before accessing handle
+                       CheckpointStreamFactory.CheckpointStateOutputStream 
stream5 =
+                               
streamFactory.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
+                       stream5.write(state4);
+                       stream5.close();
+                       try {
+                               stream5.closeAndGetHandle();
+                               fail();
+                       } catch (IOException e) {
+                               // uh-huh
+                       }
+
+                       validateBytesInStream(handle1.openInputStream(), 
state1);
+                       handle1.discardState();
+                       assertFalse(isDirectoryEmpty(basePath));
+                       ensureLocalFileDeleted(handle1.getFilePath());
+
+                       validateBytesInStream(handle2.openInputStream(), 
state2);
+                       handle2.discardState();
+
+                       // nothing was written to the stream, so it will return 
nothing
+                       assertNull(handle3);
+
+                       validateBytesInStream(handle4.openInputStream(), 
state4);
+                       handle4.discardState();
+                       assertTrue(isDirectoryEmpty(checkpointPath));
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testEntropyCheckpointStream() throws Exception {
+               File basePath = tempFolder.newFolder().getAbsoluteFile();
+               {
+                       final String path = basePath.getAbsolutePath();
+                       final String entropyKey = "_ENTROPY_";
+
+                       verifyEntropy(path, entropyKey, false);
+               }
+
+               {
+                       final String path = basePath.getAbsolutePath();
+                       final String entropyKey = "";
+
+                       verifyEntropy(path, entropyKey, false);
+               }
+       }
+
+       @Test
+       public void testEntropyInvalidCharsCheckpointStream() throws Exception {
+               File basePath = tempFolder.newFolder().getAbsoluteFile();
+               {
+                       final String path = basePath.getAbsolutePath();
+                       final String entropyKey = "#$**!^\\";
+
+                       verifyEntropy(path, entropyKey, true);
+               }
+
+               {
+                       final String path = basePath.getAbsolutePath();
+                       final String entropyKey = "//\\";
+
+                       verifyEntropy(path, entropyKey, true);
+               }
+       }
+
+       private void verifyEntropy(String path, String entropyKey, boolean 
invalid) {
+               //add the entropy pattern at the end of the path
+               final String pathWithEntropy = path + 
FileSystems.getDefault().getSeparator() + entropyKey;
+
+               try {
+                       // the state backend has a very low in-mem state 
threshold (15 bytes)
+                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new FsStateBackend(new 
File(pathWithEntropy).toURI(), 15, entropyKey));
+                       JobID jobId = new JobID();
+
+                       // we know how FsCheckpointStreamFactory is implemented 
so we know where it
+                       // will store checkpoints
+                       File checkpointPath = new File(pathWithEntropy, 
jobId.toString());
+
+                       CheckpointStreamFactory streamFactory = 
backend.createStreamFactory(jobId, "test_entropy");
+                       final Path chkpointPath = 
((FsCheckpointStreamFactory)streamFactory).getCheckpointDirectory();
+
+                       if (entropyKey == null || entropyKey.isEmpty()) {
+                               assert(true);
+                               return;
+                       }
+                       //check entropyKey got replaced by the actual 4 
character random alphanumeric string
+                       assert(!chkpointPath.getPath().contains(entropyKey));
+                       final String[] segments = 
chkpointPath.getPath().split(FileSystems.getDefault().getSeparator());
+
+                       //jobid gets suffixed after the entropy
+                       final String entropy = segments[segments.length - 2];
+                       assert(entropy.length() == 4);
+                       assert(StringUtils.isAlphanumeric(entropy));
+
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       if (!invalid) {
+                               fail(e.getMessage());
+                       } else {
+                               assert(true);
+                       }
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add entropy to s3 path for better scalability
> ---------------------------------------------
>
>                 Key: FLINK-9061
>                 URL: https://issues.apache.org/jira/browse/FLINK-9061
>             Project: Flink
>          Issue Type: Improvement
>          Components: FileSystem, State Backends, Checkpointing
>    Affects Versions: 1.4.2, 1.5.0
>            Reporter: Jamie Grier
>            Assignee: Indrajit Roychoudhury
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.6.2, 1.7.0
>
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to