This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push: new c2bc493 [FLINK-11379][core] Fix OutOfMemoryError caused by Files.readAllBytes() when TM loads a large size TDD c2bc493 is described below commit c2bc493cb38258281a98a80848370b3a5b5c01e8 Author: sunhaibotb <sunhaib...@163.com> AuthorDate: Thu Feb 21 17:23:53 2019 +0800 [FLINK-11379][core] Fix OutOfMemoryError caused by Files.readAllBytes() when TM loads a large size TDD This closes #7797 --- .../main/java/org/apache/flink/util/FileUtils.java | 98 +++++++++++++++++++++- .../java/org/apache/flink/util/FileUtilsTest.java | 85 +++++++++++++++++++ .../deployment/TaskDeploymentDescriptor.java | 6 +- 3 files changed, 185 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java index 8f32262..c4d6df7 100644 --- a/flink-core/src/main/java/org/apache/flink/util/FileUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/FileUtils.java @@ -28,11 +28,15 @@ import org.apache.flink.util.function.ThrowingConsumer; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.Files; import java.nio.file.StandardOpenOption; +import java.util.Arrays; import java.util.Random; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; @@ -56,6 +60,15 @@ public final class FileUtils { /** The length of the random part of the filename. */ private static final int RANDOM_FILE_NAME_LENGTH = 12; + /** + * The maximum size of array to allocate for reading. See + * {@link java.nio.file.Files#MAX_BUFFER_SIZE} for more. + */ + private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; + + /** The size of the buffer used for reading. */ + private static final int BUFFER_SIZE = 4096; + // ------------------------------------------------------------------------ public static void writeCompletely(WritableByteChannel channel, ByteBuffer src) throws IOException { @@ -90,7 +103,7 @@ public final class FileUtils { // ------------------------------------------------------------------------ public static String readFile(File file, String charsetName) throws IOException { - byte[] bytes = Files.readAllBytes(file.toPath()); + byte[] bytes = readAllBytes(file.toPath()); return new String(bytes, charsetName); } @@ -107,6 +120,89 @@ public final class FileUtils { writeFile(file, contents, "UTF-8"); } + /** + * Reads all the bytes from a file. The method ensures that the file is + * closed when all bytes have been read or an I/O error, or other runtime + * exception, is thrown. + * + * <p>This is an implementation that follow {@link java.nio.file.Files#readAllBytes(java.nio.file.Path)}, + * and the difference is that it limits the size of the direct buffer to avoid + * direct-buffer OutOfMemoryError. When {@link java.nio.file.Files#readAllBytes(java.nio.file.Path)} + * or other interfaces in java API can do this in the future, we should remove it. + * + * @param path + * the path to the file + * @return a byte array containing the bytes read from the file + * + * @throws IOException + * if an I/O error occurs reading from the stream + * @throws OutOfMemoryError + * if an array of the required size cannot be allocated, for + * example the file is larger that {@code 2GB} + */ + public static byte[] readAllBytes(java.nio.file.Path path) throws IOException { + try (SeekableByteChannel channel = Files.newByteChannel(path); + InputStream in = Channels.newInputStream(channel)) { + + long size = channel.size(); + if (size > (long) MAX_BUFFER_SIZE) { + throw new OutOfMemoryError("Required array size too large"); + } + + return read(in, (int) size); + } + } + + /** + * Reads all the bytes from an input stream. Uses {@code initialSize} as a hint + * about how many bytes the stream will have and uses {@code directBufferSize} + * to limit the size of the direct buffer used to read. + * + * @param source + * the input stream to read from + * @param initialSize + * the initial size of the byte array to allocate + * @return a byte array containing the bytes read from the file + * + * @throws IOException + * if an I/O error occurs reading from the stream + * @throws OutOfMemoryError + * if an array of the required size cannot be allocated + */ + private static byte[] read(InputStream source, int initialSize) throws IOException { + int capacity = initialSize; + byte[] buf = new byte[capacity]; + int nread = 0; + int n; + + for (; ;) { + // read to EOF which may read more or less than initialSize (eg: file + // is truncated while we are reading) + while ((n = source.read(buf, nread, Math.min(capacity - nread, BUFFER_SIZE))) > 0) { + nread += n; + } + + // if last call to source.read() returned -1, we are done + // otherwise, try to read one more byte; if that failed we're done too + if (n < 0 || (n = source.read()) < 0) { + break; + } + + // one more byte was read; need to allocate a larger buffer + if (capacity <= MAX_BUFFER_SIZE - capacity) { + capacity = Math.max(capacity << 1, BUFFER_SIZE); + } else { + if (capacity == MAX_BUFFER_SIZE) { + throw new OutOfMemoryError("Required array size too large"); + } + capacity = MAX_BUFFER_SIZE; + } + buf = Arrays.copyOf(buf, capacity); + buf[nread++] = (byte) n; + } + return (capacity == nread) ? buf : Arrays.copyOf(buf, nread); + } + // ------------------------------------------------------------------------ // Deleting directories on standard File Systems // ------------------------------------------------------------------------ diff --git a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java index 76b7805..c3cd7ba 100644 --- a/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java @@ -18,6 +18,7 @@ package org.apache.flink.util; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.CheckedThread; @@ -31,12 +32,16 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.nio.file.AccessDeniedException; import java.nio.file.Files; import java.nio.file.Paths; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.Comparator; import java.util.List; +import java.util.Random; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -60,6 +65,41 @@ public class FileUtilsTest extends TestLogger { // ------------------------------------------------------------------------ @Test + public void testReadAllBytes() throws Exception { + TemporaryFolder tmpFolder = null; + try { + tmpFolder = new TemporaryFolder(new File(this.getClass().getResource("/").getPath())); + tmpFolder.create(); + + final int fileSize = 1024; + final String testFilePath = tmpFolder.getRoot().getAbsolutePath() + File.separator + + this.getClass().getSimpleName() + "_" + fileSize + ".txt"; + + { + String expectedMD5 = generateTestFile(testFilePath, 1024); + final byte[] data = FileUtils.readAllBytes((new File(testFilePath)).toPath()); + assertEquals(expectedMD5, md5Hex(data)); + } + + { + String expectedMD5 = generateTestFile(testFilePath, 4096); + final byte[] data = FileUtils.readAllBytes((new File(testFilePath)).toPath()); + assertEquals(expectedMD5, md5Hex(data)); + } + + { + String expectedMD5 = generateTestFile(testFilePath, 5120); + final byte[] data = FileUtils.readAllBytes((new File(testFilePath)).toPath()); + assertEquals(expectedMD5, md5Hex(data)); + } + } finally { + if (tmpFolder != null) { + tmpFolder.delete(); + } + } + } + + @Test public void testDeletePathIfEmpty() throws IOException { final FileSystem localFs = FileSystem.getLocalFileSystem(); @@ -272,6 +312,51 @@ public class FileUtilsTest extends TestLogger { } } + /** + * Generates a random content file. + * + * @param outputFile the path of the output file + * @param length the size of content to generate + * + * @return MD5 of the output file + * + * @throws IOException + * @throws NoSuchAlgorithmException + */ + private static String generateTestFile(String outputFile, int length) throws IOException, NoSuchAlgorithmException { + Path outputFilePath = new Path(outputFile); + + final FileSystem fileSystem = outputFilePath.getFileSystem(); + try (final FSDataOutputStream fsDataOutputStream = fileSystem.create(outputFilePath, FileSystem.WriteMode.OVERWRITE)) { + return writeRandomContent(fsDataOutputStream, length); + } + } + + private static String writeRandomContent(OutputStream out, int length) throws IOException, NoSuchAlgorithmException { + MessageDigest messageDigest = MessageDigest.getInstance("MD5"); + + Random random = new Random(); + char startChar = 32, endChar = 127; + for (int i = 0; i < length; i++) { + int rnd = random.nextInt(endChar - startChar); + byte b = (byte) (startChar + rnd); + + out.write(b); + messageDigest.update(b); + } + + byte[] b = messageDigest.digest(); + return org.apache.flink.util.StringUtils.byteToHexString(b); + } + + private static String md5Hex(byte[] data) throws NoSuchAlgorithmException { + MessageDigest messageDigest = MessageDigest.getInstance("MD5"); + messageDigest.update(data); + + byte[] b = messageDigest.digest(); + return org.apache.flink.util.StringUtils.byteToHexString(b); + } + // ------------------------------------------------------------------------ private static class Deleter extends CheckedThread { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index bb038eb..11afe50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; +import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; @@ -34,7 +35,6 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.nio.file.Files; import java.util.Collection; /** @@ -300,7 +300,7 @@ public final class TaskDeploymentDescriptor implements Serializable { // (it is deleted automatically on the BLOB server and cache when the job // enters a terminal state) SerializedValue<JobInformation> serializedValue = - SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath())); + SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); serializedJobInformation = new NonOffloaded<>(serializedValue); } @@ -315,7 +315,7 @@ public final class TaskDeploymentDescriptor implements Serializable { // (it is deleted automatically on the BLOB server and cache when the job // enters a terminal state) SerializedValue<TaskInformation> serializedValue = - SerializedValue.fromBytes(Files.readAllBytes(dataFile.toPath())); + SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); serializedTaskInformation = new NonOffloaded<>(serializedValue); }