xborder commented on code in PR #954:
URL: https://github.com/apache/arrow-java/pull/954#discussion_r2683092709


##########
flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java:
##########
@@ -377,6 +382,121 @@ private static int readRawVarint32(int firstByte, 
InputStream is) throws IOExcep
     return CodedInputStream.readRawVarint32(firstByte, is);
   }
 
+  /**
+   * Reads data from the stream into an ArrowBuf, without copying data when 
possible.
+   *
+   * <p>First attempts to transfer ownership of the gRPC buffer to Arrow via 
{@link
+   * #wrapGrpcBuffer}. This avoids any memory copy when the gRPC transport 
provides a direct
+   * ByteBuffer (e.g., Netty).
+   *
+   * <p>If not possible (e.g., heap buffer, fragmented data, or unsupported 
transport), falls back
+   * to allocating a new buffer and copying data into it.
+   *
+   * @param allocator The allocator to use for buffer allocation
+   * @param stream The input stream to read from
+   * @param size The number of bytes to read
+   * @return An ArrowBuf containing the data
+   * @throws IOException if there is an error reading from the stream
+   */
+  private static ArrowBuf readBuffer(BufferAllocator allocator, InputStream 
stream, int size)
+      throws IOException {
+    if (ENABLE_ZERO_COPY_READ) {
+      ArrowBuf zeroCopyBuf = wrapGrpcBuffer(stream, allocator, size);
+      if (zeroCopyBuf != null) {
+        return zeroCopyBuf;
+      }
+    }
+
+    // Fall back to allocating and copying
+     ArrowBuf buf = allocator.buffer(size);
+     byte[] heapBytes = new byte[size];
+     ByteStreams.readFully(stream, heapBytes);
+     buf.writeBytes(heapBytes);
+     buf.writerIndex(size);
+     return buf;
+  }
+
+  /**
+   * Attempts to wrap gRPC's buffer as an ArrowBuf without copying.
+   *
+   * <p>This method takes ownership of gRPC's underlying buffer via {@link 
Detachable#detach()} and
+   * wraps it as an ArrowBuf using {@link 
BufferAllocator#wrapForeignAllocation}. The gRPC buffer
+   * will be released when the ArrowBuf is closed.
+   *
+   * @param stream The gRPC-provided InputStream
+   * @param allocator The allocator to use for wrapping the foreign allocation
+   * @param size The number of bytes to wrap
+   * @return An ArrowBuf wrapping gRPC's buffer, or {@code null} if zero-copy 
is not possible
+   */
+  static ArrowBuf wrapGrpcBuffer(
+      final InputStream stream, final BufferAllocator allocator, final int 
size) {
+
+    if (!(stream instanceof Detachable) || !(stream instanceof HasByteBuffer)) 
{
+      return null;
+    }
+
+    HasByteBuffer hasByteBuffer = (HasByteBuffer) stream;
+    if (!hasByteBuffer.byteBufferSupported()) {
+      return null;
+    }
+
+    ByteBuffer peekBuffer = hasByteBuffer.getByteBuffer();
+    if (peekBuffer == null) {
+      return null;
+    }
+    if (!peekBuffer.isDirect()) {
+      return null;
+    }
+    if (peekBuffer.remaining() < size) {
+      // Data is fragmented across multiple buffers; zero-copy not possible
+      return null;
+    }
+
+    // Take ownership
+    Detachable detachable = (Detachable) stream;
+    InputStream detachedStream = detachable.detach();
+
+    // Get buffer from detached stream
+    HasByteBuffer detachedHasByteBuffer = (HasByteBuffer) detachedStream;
+    ByteBuffer detachedByteBuffer = detachedHasByteBuffer.getByteBuffer();
+
+    if (detachedByteBuffer == null || !detachedByteBuffer.isDirect()) {
+      closeQuietly(detachedStream);
+      return null;
+    }

Review Comment:
   good catch. Actually this is redundant, the `isDirect` is already checked at 
the beginning of the method so I just removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to