[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675258#comment-16675258 ]
ASF GitHub Bot commented on FLINK-9061: --------------------------------------- GJL closed pull request #6302: [FLINK-9061][checkpointing] add entropy to s3 path for better scalability URL: https://github.com/apache/flink/pull/6302 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 6ec7ec836c0..06d007dbbb9 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -179,6 +179,26 @@ public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheck this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing); } + /** + * Creates a new entropy based {@code RocksDBStateBackend} that stores its checkpoint data in the + * file system and location defined by the given URI. + * + * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system + * host and port in the URI, or have the Hadoop configuration that describes the file system + * (host / high-availability group / possibly credentials) either referenced from the Flink + * config, or included in the classpath. + * + * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. + * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. + * @param entropyInjectionKey The string that specifies entropy key in the checkpoint uri + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + */ + public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing, String entropyInjectionKey) throws IOException { + this(new FsStateBackend(checkpointDataUri, entropyInjectionKey), + enableIncrementalCheckpointing); + } + /** * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its * checkpoint data streams. Typically, one would supply a filesystem or database state backend @@ -209,6 +229,7 @@ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; } + // ------------------------------------------------------------------------ // State backend methods // ------------------------------------------------------------------------ diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java index f0569b8d61e..be8999e6e2b 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java @@ -44,12 +44,16 @@ /** The key under which the config stores the directory where RocksDB should be stored. */ public static final String ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY = "state.backend.rocksdb.checkpointdir"; + /** The key defines the pattern in checkpoint uri where entropy value should be injected or substituted. */ + public static final String CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY = "state.backend.fs.checkpointdir.injectEntropy.key"; + @Override public RocksDBStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException, IOException { final String checkpointDirURI = config.getString(CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); final String rocksdbLocalPath = config.getString(ROCKSDB_CHECKPOINT_DIRECTORY_URI_CONF_KEY, null); + final String entropyInjectionKey = config.getString(CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY, "__ENTROPY_KEY__"); if (checkpointDirURI == null) { throw new IllegalConfigurationException( @@ -59,7 +63,8 @@ public RocksDBStateBackend createFromConfig(Configuration config) try { Path path = new Path(checkpointDirURI); - RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri()); + RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri(), + true, entropyInjectionKey); if (rocksdbLocalPath != null) { String[] directories = rocksdbLocalPath.split(",|" + File.pathSeparator); backend.setDbStoragePaths(directories); diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index a8c583bbcd6..bf078d4271c 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -102,7 +102,7 @@ under the License. <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> - + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index a8246518a65..2d6d470c8c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.state.filesystem; +import java.net.URISyntaxException; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; @@ -63,6 +66,9 @@ /** Cached handle to the file system for file operations */ private final FileSystem filesystem; + /** Default random entropy string length*/ + private static final int DEFAULT_RANDOM_ENTROPY_LENGTH = 4; + /** * Creates a new state backend that stores its checkpoint data in the file system and location * defined by the given URI. @@ -84,7 +90,8 @@ public FsCheckpointStreamFactory( Path checkpointDataUri, JobID jobId, - int fileStateSizeThreshold) throws IOException { + int fileStateSizeThreshold, + String entropyInjectionKey) throws IOException { if (fileStateSizeThreshold < 0) { throw new IllegalArgumentException("The threshold for file state size must be zero or larger."); @@ -98,7 +105,7 @@ public FsCheckpointStreamFactory( Path basePath = checkpointDataUri; filesystem = basePath.getFileSystem(); - checkpointDirectory = createBasePath(filesystem, basePath, jobId); + checkpointDirectory = createBasePath(filesystem, basePath, jobId, entropyInjectionKey); if (LOG.isDebugEnabled()) { LOG.debug("Initialed file stream factory to URI {}.", checkpointDirectory); @@ -127,12 +134,47 @@ private void checkFileSystemInitialized() throws IllegalStateException { } } - protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException { - Path dir = new Path(checkpointDirectory, jobID.toString()); + protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID, + String entropyInjectionKey) throws IOException { + Path checkpointDirectoryWithEntropy; + try { + checkpointDirectoryWithEntropy = injectEntropy(checkpointDirectory, entropyInjectionKey); + } + catch(URISyntaxException ex) { + throw new IOException("URI error occurred while injecting entropy to checkpoint path.", ex); + } + Path dir = new Path(checkpointDirectoryWithEntropy, jobID.toString()); fs.mkdirs(dir); return dir; } + protected Path injectEntropy(Path basePath, String entropyInjectionKey) + throws URISyntaxException { + + final URI originalUri = basePath.toUri(); + String chkpointPath = originalUri.getPath(); + String retPath = chkpointPath; + + if (!entropyInjectionKey.isEmpty()) { + if (chkpointPath.contains(entropyInjectionKey)) { + final String randomChars = RandomStringUtils.randomAlphanumeric(DEFAULT_RANDOM_ENTROPY_LENGTH); + retPath = chkpointPath.replaceAll(entropyInjectionKey, randomChars); + LOG.debug("Entropy injected checkpoint path {}", retPath); + } else { + LOG.warn("Entropy inject key is non-empty: {} , however key is not found in checkpoint path : {}", entropyInjectionKey, chkpointPath); + } + } + URI stateUri = new URI(originalUri.getScheme(), originalUri.getAuthority(), + retPath, originalUri.getQuery(), originalUri.getFragment()); + + return new Path(stateUri); + } + + @VisibleForTesting + public Path getCheckpointDirectory() { + return checkpointDirectory; + } + protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) { return new Path(checkpointDirectory, "chk-" + checkpointID); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java index 7410d2dd971..2b9da7ba1dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java @@ -39,11 +39,12 @@ public FsSavepointStreamFactory( JobID jobId, int fileStateSizeThreshold) throws IOException { - super(checkpointDataUri, jobId, fileStateSizeThreshold); + super(checkpointDataUri, jobId, fileStateSizeThreshold,""); } @Override - protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException { + protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID, + String entropyInjectionKey) throws IOException { // No checkpoint specific directory required as the savepoint directory // is already unique. return checkpointDirectory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 952988f72d5..9d7d3ce3070 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state.filesystem; +import java.util.regex.Pattern; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.fs.FileSystem; @@ -67,6 +68,12 @@ /** Switch to chose between synchronous and asynchronous snapshots */ private final boolean asynchronousSnapshots; + /** invalid chars for the entropy key */ + private static final Pattern INVALID_ENTROPY_KEY_CHARS_PATTERN = Pattern.compile("^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$"); + + /** Entropy injection key */ + private final String entropyInjectionKey; + /** * Creates a new state backend that stores its checkpoint data in the file system and location * defined by the given URI. @@ -211,6 +218,67 @@ public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold) throws this(checkpointDataUri, fileStateSizeThreshold, true); } + /** + * Creates a new entropy based state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @param fileStateSizeThreshold State up to this size will be stored as part of the metadata, + * rather than in files + * @param entropyInjectionKey String that identifies the entropy key in the checkpoint uri + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds. + */ + public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, + String entropyInjectionKey) throws IOException { + + checkArgument(fileStateSizeThreshold >= 0, "The threshold for file state size must be " + + "zero or larger."); + checkArgument(fileStateSizeThreshold <= MAX_FILE_STATE_THRESHOLD, + "The threshold for file state size cannot be larger than %s", MAX_FILE_STATE_THRESHOLD); + + checkArgument(!INVALID_ENTROPY_KEY_CHARS_PATTERN.matcher(entropyInjectionKey).matches()); + + this.fileStateThreshold = fileStateSizeThreshold; + this.basePath = validateAndNormalizeUri(checkpointDataUri); + + this.asynchronousSnapshots = true; + this.entropyInjectionKey = entropyInjectionKey; + } + + /** + * Creates a new entropy based state backend that stores its checkpoint data in the file system and location + * defined by the given URI. + * + * <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://') + * must be accessible via {@link FileSystem#get(URI)}. + * + * <p>For a state backend targeting HDFS, this means that the URI must either specify the authority + * (host and port), or that the Hadoop configuration that describes that information must be in the + * classpath. + * + * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority), + * and the path to the checkpoint data directory. + * @param entropyInjectionKey String that identifies the entropy key in the checkpoint uri + * + * @throws IOException Thrown, if no file system can be found for the scheme in the URI. + * @throws IllegalArgumentException Thrown, if the {@code fileStateSizeThreshold} is out of bounds. + */ + + public FsStateBackend(URI checkpointDataUri, + String entropyInjectionKey) throws IOException { + this(checkpointDataUri, DEFAULT_FILE_STATE_THRESHOLD, entropyInjectionKey); + } + /** * Creates a new state backend that stores its checkpoint data in the file system and location * defined by the given URI. @@ -243,6 +311,7 @@ public FsStateBackend( this.basePath = validateAndNormalizeUri(checkpointDataUri); this.asynchronousSnapshots = asynchronousSnapshots; + this.entropyInjectionKey = ""; } /** @@ -274,7 +343,8 @@ public int getMinFileSizeThreshold() { @Override public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { - return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold); + return new FsCheckpointStreamFactory(basePath, jobId, fileStateThreshold, + entropyInjectionKey); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java index 4c933ef9543..1625a58b2aa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java @@ -38,6 +38,10 @@ * rather than in files */ public static final String MEMORY_THRESHOLD_CONF_KEY = "state.backend.fs.memory-threshold"; + /** The key defines the pattern in checkpoint uri where entropy value should be injected or substituted */ + public static final String CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY = "state.backend.fs.checkpointdir.injectEntropy.key"; + + @Override public FsStateBackend createFromConfig(Configuration config) throws IllegalConfigurationException { @@ -45,6 +49,9 @@ public FsStateBackend createFromConfig(Configuration config) throws IllegalConfi final int memoryThreshold = config.getInteger( MEMORY_THRESHOLD_CONF_KEY, FsStateBackend.DEFAULT_FILE_STATE_THRESHOLD); + final String entropyInjectionKey = config.getString(CHECKPOINT_DIRECTORY_ENTROPY_KEY_CONF_KEY, "__ENTROPY_KEY__"); + + if (checkpointDirURI == null) { throw new IllegalConfigurationException( "Cannot create the file system state backend: The configuration does not specify the " + @@ -53,7 +60,7 @@ public FsStateBackend createFromConfig(Configuration config) throws IllegalConfi try { Path path = new Path(checkpointDirURI); - return new FsStateBackend(path.toUri(), memoryThreshold); + return new FsStateBackend(path.toUri(), memoryThreshold, entropyInjectionKey); } catch (IOException | IllegalArgumentException e) { throw new IllegalConfigurationException("Invalid configuration for the state backend", e); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java index 6be23437d88..799072106ba 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java @@ -18,11 +18,14 @@ package org.apache.flink.runtime.state; +import java.nio.file.FileSystems; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.junit.Ignore; @@ -161,6 +164,171 @@ public void testStateOutputStream() throws IOException { } } + @Test + public void testEntropyStateOutputStream() throws IOException { + File basePath = tempFolder.newFolder().getAbsoluteFile(); + final String path = basePath.getAbsolutePath(); + final String entropyKey = "_ENTROPY_"; + + final String pathWithEntropy = path + FileSystems.getDefault().getSeparator() + entropyKey; + + try { + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(new File(pathWithEntropy).toURI(), 15, entropyKey)); + JobID jobId = new JobID(); + + // we know how FsCheckpointStreamFactory is implemented so we know where it + // will store checkpoints + File checkpointPath = new File(pathWithEntropy, jobId.toString()); + + CheckpointStreamFactory streamFactory = backend.createStreamFactory(jobId, "test_entropy"); + + byte[] state1 = new byte[1274673]; + byte[] state2 = new byte[1]; + byte[] state3 = new byte[0]; + byte[] state4 = new byte[177]; + + Random rnd = new Random(); + rnd.nextBytes(state1); + rnd.nextBytes(state2); + rnd.nextBytes(state3); + rnd.nextBytes(state4); + + long checkpointId = 97231523453L; + + CheckpointStreamFactory.CheckpointStateOutputStream stream1 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + + CheckpointStreamFactory.CheckpointStateOutputStream stream2 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + + CheckpointStreamFactory.CheckpointStateOutputStream stream3 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + + stream1.write(state1); + stream2.write(state2); + stream3.write(state3); + + FileStateHandle handle1 = (FileStateHandle) stream1.closeAndGetHandle(); + ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle(); + ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle(); + + // use with try-with-resources + StreamStateHandle handle4; + try (CheckpointStreamFactory.CheckpointStateOutputStream stream4 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { + stream4.write(state4); + handle4 = stream4.closeAndGetHandle(); + } + + // close before accessing handle + CheckpointStreamFactory.CheckpointStateOutputStream stream5 = + streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); + stream5.write(state4); + stream5.close(); + try { + stream5.closeAndGetHandle(); + fail(); + } catch (IOException e) { + // uh-huh + } + + validateBytesInStream(handle1.openInputStream(), state1); + handle1.discardState(); + assertFalse(isDirectoryEmpty(basePath)); + ensureLocalFileDeleted(handle1.getFilePath()); + + validateBytesInStream(handle2.openInputStream(), state2); + handle2.discardState(); + + // nothing was written to the stream, so it will return nothing + assertNull(handle3); + + validateBytesInStream(handle4.openInputStream(), state4); + handle4.discardState(); + assertTrue(isDirectoryEmpty(checkpointPath)); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testEntropyCheckpointStream() throws Exception { + File basePath = tempFolder.newFolder().getAbsoluteFile(); + { + final String path = basePath.getAbsolutePath(); + final String entropyKey = "_ENTROPY_"; + + verifyEntropy(path, entropyKey, false); + } + + { + final String path = basePath.getAbsolutePath(); + final String entropyKey = ""; + + verifyEntropy(path, entropyKey, false); + } + } + + @Test + public void testEntropyInvalidCharsCheckpointStream() throws Exception { + File basePath = tempFolder.newFolder().getAbsoluteFile(); + { + final String path = basePath.getAbsolutePath(); + final String entropyKey = "#$**!^\\"; + + verifyEntropy(path, entropyKey, true); + } + + { + final String path = basePath.getAbsolutePath(); + final String entropyKey = "//\\"; + + verifyEntropy(path, entropyKey, true); + } + } + + private void verifyEntropy(String path, String entropyKey, boolean invalid) { + //add the entropy pattern at the end of the path + final String pathWithEntropy = path + FileSystems.getDefault().getSeparator() + entropyKey; + + try { + // the state backend has a very low in-mem state threshold (15 bytes) + FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(new File(pathWithEntropy).toURI(), 15, entropyKey)); + JobID jobId = new JobID(); + + // we know how FsCheckpointStreamFactory is implemented so we know where it + // will store checkpoints + File checkpointPath = new File(pathWithEntropy, jobId.toString()); + + CheckpointStreamFactory streamFactory = backend.createStreamFactory(jobId, "test_entropy"); + final Path chkpointPath = ((FsCheckpointStreamFactory)streamFactory).getCheckpointDirectory(); + + if (entropyKey == null || entropyKey.isEmpty()) { + assert(true); + return; + } + //check entropyKey got replaced by the actual 4 character random alphanumeric string + assert(!chkpointPath.getPath().contains(entropyKey)); + final String[] segments = chkpointPath.getPath().split(FileSystems.getDefault().getSeparator()); + + //jobid gets suffixed after the entropy + final String entropy = segments[segments.length - 2]; + assert(entropy.length() == 4); + assert(StringUtils.isAlphanumeric(entropy)); + + } + catch (Exception e) { + e.printStackTrace(); + if (!invalid) { + fail(e.getMessage()); + } else { + assert(true); + } + } + } + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add entropy to s3 path for better scalability > --------------------------------------------- > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Improvement > Components: FileSystem, State Backends, Checkpointing > Affects Versions: 1.4.2, 1.5.0 > Reporter: Jamie Grier > Assignee: Indrajit Roychoudhury > Priority: Critical > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)