zentol commented on a change in pull request #7797: [FLINK-11379] Fix 
OutOfMemoryError caused by Files.readAllBytes() when TM loads a large size TDD
URL: https://github.com/apache/flink/pull/7797#discussion_r261178798
 
 

 ##########
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##########
 @@ -107,6 +120,89 @@ public static void writeFileUtf8(File file, String 
contents) throws IOException
                writeFile(file, contents, "UTF-8");
        }
 
+       /**
+        * Reads all the bytes from a file. The method ensures that the file is
+        * closed when all bytes have been read or an I/O error, or other 
runtime
+        * exception, is thrown.
+        *
+        * <p>This is an implementation that follow {@link 
java.nio.file.Files#readAllBytes(java.nio.file.Path)},
+        * and the difference is that it limits the size of the direct buffer 
to avoid
+        * direct-buffer OutOfMemoryError. When {@link 
java.nio.file.Files#readAllBytes(java.nio.file.Path)}
+        * or other interfaces in java API can do this in the future, we should 
remove it.
+        *
+        * @param path
+        *        the path to the file
+        * @return a byte array containing the bytes read from the file
+        *
+        * @throws IOException
+        *         if an I/O error occurs reading from the stream
+        * @throws OutOfMemoryError
+        *         if an array of the required size cannot be allocated, for
+        *         example the file is larger that {@code 2GB}
+        */
+       public static byte[] readAllBytes(java.nio.file.Path path) throws 
IOException {
+               try (SeekableByteChannel channel = Files.newByteChannel(path);
+                       InputStream in = Channels.newInputStream(channel)) {
+
+                       long size = channel.size();
+                       if (size > (long) MAX_BUFFER_SIZE) {
+                               throw new OutOfMemoryError("Required array size 
too large");
+                       }
+
+                       return read(in, (int) size);
+               }
+       }
+
+       /**
+        * Reads all the bytes from an input stream. Uses {@code initialSize} 
as a hint
+        * about how many bytes the stream will have and uses {@code 
directBufferSize}
+        * to limit the size of the direct buffer used to read.
+        *
+        * @param source
+        *        the input stream to read from
+        * @param initialSize
+        *        the initial size of the byte array to allocate
+        * @return a byte array containing the bytes read from the file
+        *
+        * @throws IOException
+        *         if an I/O error occurs reading from the stream
+        * @throws OutOfMemoryError
+        *         if an array of the required size cannot be allocated
+        */
+       private static byte[] read(InputStream source, int initialSize) throws 
IOException {
+               int capacity = initialSize;
+               byte[] buf = new byte[capacity];
 
 Review comment:
    don't see how we're solving the issue. capacity is still the size of the 
entire file.
   
   The only difference is that we pass `BUFFER_SIZE` to `InputStream#read`, but 
that method never allocates an array.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to