zentol commented on a change in pull request #7797: [FLINK-11379] Fix OutOfMemoryError caused by Files.readAllBytes() when TM loads a large size TDD URL: https://github.com/apache/flink/pull/7797#discussion_r261178798
########## File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ########## @@ -107,6 +120,89 @@ public static void writeFileUtf8(File file, String contents) throws IOException writeFile(file, contents, "UTF-8"); } + /** + * Reads all the bytes from a file. The method ensures that the file is + * closed when all bytes have been read or an I/O error, or other runtime + * exception, is thrown. + * + * <p>This is an implementation that follow {@link java.nio.file.Files#readAllBytes(java.nio.file.Path)}, + * and the difference is that it limits the size of the direct buffer to avoid + * direct-buffer OutOfMemoryError. When {@link java.nio.file.Files#readAllBytes(java.nio.file.Path)} + * or other interfaces in java API can do this in the future, we should remove it. + * + * @param path + * the path to the file + * @return a byte array containing the bytes read from the file + * + * @throws IOException + * if an I/O error occurs reading from the stream + * @throws OutOfMemoryError + * if an array of the required size cannot be allocated, for + * example the file is larger that {@code 2GB} + */ + public static byte[] readAllBytes(java.nio.file.Path path) throws IOException { + try (SeekableByteChannel channel = Files.newByteChannel(path); + InputStream in = Channels.newInputStream(channel)) { + + long size = channel.size(); + if (size > (long) MAX_BUFFER_SIZE) { + throw new OutOfMemoryError("Required array size too large"); + } + + return read(in, (int) size); + } + } + + /** + * Reads all the bytes from an input stream. Uses {@code initialSize} as a hint + * about how many bytes the stream will have and uses {@code directBufferSize} + * to limit the size of the direct buffer used to read. + * + * @param source + * the input stream to read from + * @param initialSize + * the initial size of the byte array to allocate + * @return a byte array containing the bytes read from the file + * + * @throws IOException + * if an I/O error occurs reading from the stream + * @throws OutOfMemoryError + * if an array of the required size cannot be allocated + */ + private static byte[] read(InputStream source, int initialSize) throws IOException { + int capacity = initialSize; + byte[] buf = new byte[capacity]; Review comment: don't see how we're solving the issue. capacity is still the size of the entire file. The only difference is that we pass `BUFFER_SIZE` to `InputStream#read`, but that method never allocates an array. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services