[ 
https://issues.apache.org/jira/browse/HADOOP-19613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18008002#comment-18008002
 ] 

ASF GitHub Bot commented on HADOOP-19613:
-----------------------------------------

anmolanmol1234 commented on code in PR #7801:
URL: https://github.com/apache/hadoop/pull/7801#discussion_r2215380532


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java:
##########
@@ -0,0 +1,637 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ * V1 implementation of ReadBufferManager.
+ */
+final class ReadBufferManagerV1 implements ReadBufferManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(
+      ReadBufferManagerV1.class);
+  private static final int ONE_KB = 1024;
+  private static final int ONE_MB = ONE_KB * ONE_KB;
+
+  private static final int NUM_BUFFERS = 16;
+  private static final int NUM_THREADS = 8;
+  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
+
+  private static int blockSize = 4 * ONE_MB;
+  private static int thresholdAgeMilliseconds = 
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
+  private Thread[] threads = new Thread[NUM_THREADS];
+  private byte[][] buffers;    // array of byte[] buffers, to hold the data 
that is read
+  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
+
+  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
+  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
+  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
+  private static ReadBufferManagerV1 bufferManager; // singleton, initialized 
in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+
+  static ReadBufferManagerV1 getBufferManager() {
+    if (bufferManager == null) {
+      LOCK.lock();
+      try {
+        if (bufferManager == null) {
+          bufferManager = new ReadBufferManagerV1();
+          bufferManager.init();
+        }
+      } finally {
+        LOCK.unlock();
+      }
+    }
+    return bufferManager;
+  }
+
+  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+    if (bufferManager == null) {
+      LOGGER.debug(
+          "ReadBufferManagerV1 not initialized yet. Overriding 
readAheadBlockSize as {}",
+          readAheadBlockSize);
+      blockSize = readAheadBlockSize;
+    }
+  }
+
+  private void init() {
+    buffers = new byte[NUM_BUFFERS][];
+    for (int i = 0; i < NUM_BUFFERS; i++) {
+      buffers[i] = new byte[blockSize];  // same buffers are reused. The byte 
array never goes back to GC
+      freeList.add(i);
+    }
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Thread t = new Thread(new ReadBufferWorker(i));
+      t.setDaemon(true);
+      threads[i] = t;
+      t.setName("ABFS-prefetch-" + i);
+      t.start();
+    }
+    ReadBufferWorker.UNLEASH_WORKERS.countDown();
+  }
+
+  // hide instance constructor
+  private ReadBufferManagerV1() {
+    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+  }
+
+  /**
+   * {@link AbfsInputStream} calls this method to queue read-aheads.
+   *
+   * @param stream          The {@link AbfsInputStream} for which to do the 
read-ahead
+   * @param requestedOffset The offset in the file which shoukd be read
+   * @param requestedLength The length to read

Review Comment:
   tracing context missing in params





> ABFS: [ReadAheadV2] Refactor ReadBufferManager to isolate new code with the 
> current working code
> ------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-19613
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19613
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>
> Read Buffer Manager used today was introduced way back and has been stable 
> for quite a while.
> Read Buffer Manager to be introduced as part of 
> https://issues.apache.org/jira/browse/HADOOP-19596 will introduce many 
> changes incrementally over time. While the development goes on and we are 
> able to fully stabilise the optimized version we need the current flow to be 
> functional and undisturbed. 
> This work item is to isolate that from new code by refactoring 
> ReadBufferManager class to have 2 different implementations with same public 
> interfaces: ReadBufferManagerV1 and ReadBufferManagerV2.
> This will also introduce new configs that can be used to toggle between new 
> and old code. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to