[ 
https://issues.apache.org/jira/browse/HADOOP-18296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17966764#comment-17966764
 ] 

ASF GitHub Bot commented on HADOOP-18296:
-----------------------------------------

YanivKunda commented on code in PR #7732:
URL: https://github.com/apache/hadoop/pull/7732#discussion_r2143605258


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java:
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.ByteBufferPool;
+
+/**
+ * A wrapper {@link ByteBufferPool} implementation that tracks whether all 
allocated buffers
+ * are released.
+ * <p>
+ * It throws the related exception at {@link #close()} if any buffer remains 
un-released.
+ * It also clears the buffers at release so if they continued being used it'll 
generate errors.
+ * <p>
+ * To be used for testing only.
+ * <p>
+ * The stacktraces of the allocation are not stored by default because
+ * it can significantly decreases the unit test performance.
+ * Configuring this class to log at DEBUG will trigger their collection.
+ * @see ByteBufferAllocationStacktraceException
+ * <p>
+ * Adapted from Parquet class {@code 
org.apache.parquet.bytes.TrackingByteBufferAllocator}.
+ */
+public final class TrackingByteBufferPool implements ByteBufferPool, 
AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TrackingByteBufferPool.class);
+
+  /**
+   * Wrap an existing allocator with this tracking allocator.
+   * @param allocator allocator to wrap.
+   * @return a new allocator.
+   */
+  public static TrackingByteBufferPool wrap(ByteBufferPool allocator) {
+    return new TrackingByteBufferPool(allocator);
+  }
+
+  /**
+   * Key for the tracker map.
+   * This uses the identity hash code of the buffer as the hash code
+   * for the map.
+   */
+  private static class Key {
+
+    private final int hashCode;
+
+    private final ByteBuffer buffer;
+
+    Key(ByteBuffer buffer) {
+      hashCode = System.identityHashCode(buffer);
+      this.buffer = buffer;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Key key = (Key) o;
+      return this.buffer == key.buffer;
+    }
+
+    @Override
+    public int hashCode() {
+      return hashCode;
+    }
+
+    @Override
+    public String toString() {
+      return buffer.toString();
+    }
+  }
+
+  public static class LeakDetectorHeapByteBufferPoolException
+      extends RuntimeException {
+
+    private LeakDetectorHeapByteBufferPoolException(String msg) {
+      super(msg);
+    }
+
+    private LeakDetectorHeapByteBufferPoolException(String msg, Throwable 
cause) {
+      super(msg, cause);
+    }
+
+    private LeakDetectorHeapByteBufferPoolException(
+        String message,
+        Throwable cause,
+        boolean enableSuppression,
+        boolean writableStackTrace) {
+      super(message, cause, enableSuppression, writableStackTrace);
+    }
+  }
+
+  /**
+   * Strack trace of allocation as saved in the tracking map.
+   */
+  public static final class ByteBufferAllocationStacktraceException
+      extends LeakDetectorHeapByteBufferPoolException {
+
+    /**
+     * Single stack trace instance to use when DEBUG is not enabled.
+     */
+    private static final ByteBufferAllocationStacktraceException 
WITHOUT_STACKTRACE =
+        new ByteBufferAllocationStacktraceException(false);
+
+    /**
+     * Create a stack trace for the map, either using the shared static one
+     * or a dynamically created one.
+     * @return a stack
+     */
+    private static ByteBufferAllocationStacktraceException create() {
+      return LOG.isDebugEnabled()
+          ? new ByteBufferAllocationStacktraceException()
+          : WITHOUT_STACKTRACE;
+    }
+
+    private ByteBufferAllocationStacktraceException() {
+      super("Allocation stacktrace of the first ByteBuffer:");
+    }
+
+    /**
+     * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE},
+     * telling develoers how to see a trace per buffer.
+     */
+    private ByteBufferAllocationStacktraceException(boolean unused) {
+      super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for 
stack traces",
+          null,
+          false,
+          false);
+    }
+  }
+
+  /**
+   * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} 
if the
+   * buffer to release was not in the hash map.
+   */
+  public static final class ReleasingUnallocatedByteBufferException
+      extends LeakDetectorHeapByteBufferPoolException {
+
+    private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
+      super(String.format("Releasing a ByteBuffer instance that is not 
allocated"
+          + " by this buffer pool or already been released: %s size %d", b, 
b.capacity()));
+    }
+  }
+
+  /**
+   * Exception raised in {@link TrackingByteBufferPool#close()} if there
+   * was an unreleased buffer.
+   */
+  public static final class LeakedByteBufferException
+      extends LeakDetectorHeapByteBufferPoolException {
+
+    private final int count;
+
+    private LeakedByteBufferException(int count, 
ByteBufferAllocationStacktraceException e) {
+      super(count + " ByteBuffer object(s) is/are remained unreleased"
+          + " after closing this buffer pool.", e);
+      this.count = count;
+    }
+
+    /**
+     * Get the number of unreleased buffers.
+     * @return number of unreleased buffers
+     */
+    public int getCount() {
+      return count;
+    }
+  }
+
+  /**
+   * Tracker of allocations.
+   * <p>
+   * The key maps by the object id of the buffer, and refers to either a 
common stack trace
+   * or one dynamically created for each allocation.
+   */
+  private final Map<Key, ByteBufferAllocationStacktraceException> allocated =

Review Comment:
   I think this can be replaced with `java.util.IdentityHashMap`, which will 
make the `Key` class redundant:
   ```suggestion
     private final Map<ByteBuffer, ByteBufferAllocationStacktraceException> 
allocated = new IdentityHashMap();
   ```





> Memory fragmentation in ChecksumFileSystem Vectored IO implementation.
> ----------------------------------------------------------------------
>
>                 Key: HADOOP-18296
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18296
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: common
>    Affects Versions: 3.4.0
>            Reporter: Mukund Thakur
>            Assignee: Steve Loughran
>            Priority: Minor
>              Labels: fs, pull-request-available
>
> As we have implemented merging of ranges in the ChecksumFSInputChecker 
> implementation of vectored IO api, it can lead to memory fragmentation. Let 
> me explain by example.
>  
> Suppose client requests for 3 ranges. 
> 0-500, 700-1000 and 1200-1500.
> Now because of merging, all the above ranges will get merged into one and we 
> will allocate a big byte buffer of 0-1500 size but return sliced byte buffers 
> for the desired ranges.
> Now once the client is done reading all the ranges, it will only be able to 
> free the memory for requested ranges and memory of the gaps will never be 
> released for eg here (500-700 and 1000-1200).
>  
> Note this only happens for direct byte buffers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to