This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new cbc45c08e5a HBASE-28292 Make Delay prefetch property to be dynamically 
configured (#5605)
cbc45c08e5a is described below

commit cbc45c08e5aec9da3f87cfe8f400e94d8054766b
Author: Abhishek Kothalikar <99398985+kabhish...@users.noreply.github.com>
AuthorDate: Tue Apr 16 18:03:06 2024 +0530

    HBASE-28292 Make Delay prefetch property to be dynamically configured 
(#5605)
    
    Signed-off-by: Wellington Chevreuil <wchevre...@apache.org>
    Signed-off-by: Peter Somogyi <psomo...@apache.org>
    
    (cherry picked from commit 16e9affca37f0027e1bc66e873cb291097aa75dd)
---
 .../org/apache/hadoop/hbase/io/hfile/HFile.java    |   2 +
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |   9 ++
 .../hadoop/hbase/io/hfile/PrefetchExecutor.java    | 104 ++++++++++++++++++---
 .../hadoop/hbase/regionserver/HRegionServer.java   |   7 ++
 .../regionserver/PrefetchExecutorNotifier.java     |  75 +++++++++++++++
 .../apache/hadoop/hbase/io/hfile/TestPrefetch.java |  57 ++++++++++-
 6 files changed, 239 insertions(+), 15 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index d18194e95c4..c805e84dd32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -455,6 +455,8 @@ public final class HFile {
 
     boolean prefetchComplete();
 
+    boolean prefetchStarted();
+
     /**
      * To close the stream's socket. Note: This can be concurrently called 
from multiple threads and
      * implementation should take care of thread safety.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index c044a1179e0..794655ef8a8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1618,6 +1618,15 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
     return PrefetchExecutor.isCompleted(path);
   }
 
+  /**
+   * Returns true if block prefetching was started after waiting for specified 
delay, false
+   * otherwise
+   */
+  @Override
+  public boolean prefetchStarted() {
+    return PrefetchExecutor.isPrefetchStarted();
+  }
+
   /**
    * Create a Scanner on this file. No seeks or reads are done on creation. 
Call
    * {@link HFileScanner#seekTo(Cell)} to position an start the read. There is 
nothing to clean up
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
index bf27cbcfbc8..707515fd8af 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java
@@ -17,20 +17,24 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import com.google.errorprone.annotations.RestrictedApi;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,23 +43,30 @@ import org.slf4j.LoggerFactory;
 public final class PrefetchExecutor {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(PrefetchExecutor.class);
+  /** Wait time in miliseconds before executing prefetch */
+  public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
+  public static final String PREFETCH_DELAY_VARIATION = 
"hbase.hfile.prefetch.delay.variation";
+  public static final float PREFETCH_DELAY_VARIATION_DEFAULT_VALUE = 0.2f;
 
   /** Futures for tracking block prefetch activity */
   private static final Map<Path, Future<?>> prefetchFutures = new 
ConcurrentSkipListMap<>();
+  /** Runnables for resetting the prefetch activity */
+  private static final Map<Path, Runnable> prefetchRunnable = new 
ConcurrentSkipListMap<>();
   /** Executor pool shared among all HFiles for block prefetch */
   private static final ScheduledExecutorService prefetchExecutorPool;
   /** Delay before beginning prefetch */
-  private static final int prefetchDelayMillis;
+  private static int prefetchDelayMillis;
   /** Variation in prefetch delay times, to mitigate stampedes */
-  private static final float prefetchDelayVariation;
+  private static float prefetchDelayVariation;
   static {
     // Consider doing this on demand with a configuration passed in rather
     // than in a static initializer.
     Configuration conf = HBaseConfiguration.create();
     // 1s here for tests, consider 30s in hbase-default.xml
     // Set to 0 for no delay
-    prefetchDelayMillis = conf.getInt("hbase.hfile.prefetch.delay", 1000);
-    prefetchDelayVariation = 
conf.getFloat("hbase.hfile.prefetch.delay.variation", 0.2f);
+    prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
+    prefetchDelayVariation =
+      conf.getFloat(PREFETCH_DELAY_VARIATION, 
PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
     int prefetchThreads = conf.getInt("hbase.hfile.thread.prefetch", 4);
     prefetchExecutorPool = new ScheduledThreadPoolExecutor(prefetchThreads, 
new ThreadFactory() {
       @Override
@@ -87,23 +98,28 @@ public final class PrefetchExecutor {
         delay = 0;
       }
       try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Prefetch requested for " + path + ", delay=" + delay + " 
ms");
-        }
-        prefetchFutures.put(path,
-          prefetchExecutorPool.schedule(runnable, delay, 
TimeUnit.MILLISECONDS));
+        LOG.debug("Prefetch requested for {}, delay={} ms", path, delay);
+        final Runnable tracedRunnable =
+          TraceUtil.wrap(runnable, "PrefetchExecutor.request");
+        final Future<?> future =
+          prefetchExecutorPool.schedule(tracedRunnable, delay, 
TimeUnit.MILLISECONDS);
+        prefetchFutures.put(path, future);
+        prefetchRunnable.put(path, runnable);
       } catch (RejectedExecutionException e) {
         prefetchFutures.remove(path);
-        LOG.warn("Prefetch request rejected for " + path);
+        prefetchRunnable.remove(path);
+        LOG.warn("Prefetch request rejected for {}", path);
       }
     }
   }
 
   public static void complete(Path path) {
     prefetchFutures.remove(path);
+    prefetchRunnable.remove(path);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Prefetch completed for " + path);
+      LOG.debug("Prefetch completed for {}", path.getName());
     }
+    LOG.debug("Prefetch completed for {}", path);
   }
 
   public static void cancel(Path path) {
@@ -112,9 +128,18 @@ public final class PrefetchExecutor {
       // ok to race with other cancellation attempts
       future.cancel(true);
       prefetchFutures.remove(path);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Prefetch cancelled for " + path);
-      }
+      prefetchRunnable.remove(path);
+      LOG.debug("Prefetch cancelled for {}", path);
+    }
+  }
+
+  public static void interrupt(Path path) {
+    Future<?> future = prefetchFutures.get(path);
+    if (future != null) {
+      prefetchFutures.remove(path);
+      // ok to race with other cancellation attempts
+      future.cancel(true);
+      LOG.debug("Prefetch cancelled for {}", path);
     }
   }
 
@@ -130,7 +155,58 @@ public final class PrefetchExecutor {
   }
 
   /* Visible for testing only */
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
   static ScheduledExecutorService getExecutorPool() {
     return prefetchExecutorPool;
   }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  static Map<Path, Future<?>> getPrefetchFutures() {
+    return prefetchFutures;
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  static Map<Path, Runnable> getPrefetchRunnable() {
+    return prefetchRunnable;
+  }
+
+  static boolean isPrefetchStarted() {
+    AtomicBoolean prefetchStarted = new AtomicBoolean(false);
+    for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
+      Path k = entry.getKey();
+      Future<?> v = entry.getValue();
+      ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
+      long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
+      if (waitTime < 0) {
+        // At this point prefetch is started
+        prefetchStarted.set(true);
+        break;
+      }
+    }
+    return prefetchStarted.get();
+  }
+
+  public static int getPrefetchDelay() {
+    return prefetchDelayMillis;
+  }
+
+  public static void loadConfiguration(Configuration conf) {
+    prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
+    prefetchDelayVariation =
+      conf.getFloat(PREFETCH_DELAY_VARIATION, 
PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
+    prefetchFutures.forEach((k, v) -> {
+      ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
+      if (!(sf.getDelay(TimeUnit.MILLISECONDS) > 0)) {
+        // the thread is still pending delay expiration and has not started to 
run yet, so can be
+        // re-scheduled at no cost.
+        interrupt(k);
+        request(k, prefetchRunnable.get(k));
+      }
+      LOG.debug("Reset called on Prefetch of file {} with delay {}, delay 
variation {}", k,
+        prefetchDelayMillis, prefetchDelayVariation);
+    });
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f521706f887..6833b66921e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -576,6 +576,9 @@ public class HRegionServer extends Thread
   // A timer to shutdown the process if abort takes too long
   private Timer abortMonitor;
 
+  // A timer submit requests to the PrefetchExecutor
+  private PrefetchExecutorNotifier prefetchExecutorNotifier;
+
   /**
    * Starts a HRegionServer at the default location.
    * <p/>
@@ -2121,6 +2124,9 @@ public class HRegionServer extends Thread
     // Compaction thread
     this.compactSplitThread = new CompactSplit(this);
 
+    // Prefetch Notifier
+    this.prefetchExecutorNotifier = new PrefetchExecutorNotifier(conf);
+
     // Background thread to check for compactions; needed if region has not 
gotten updates
     // in a while. It will take care of not checking too frequently on 
store-by-store basis.
     this.compactionChecker = new CompactionChecker(this, 
this.compactionCheckFrequency, this);
@@ -2169,6 +2175,7 @@ public class HRegionServer extends Thread
     // Registering the compactSplitThread object with the ConfigurationManager.
     configurationManager.registerObserver(this.compactSplitThread);
     configurationManager.registerObserver(this.rpcServices);
+    configurationManager.registerObserver(this.prefetchExecutorNotifier);
     configurationManager.registerObserver(this);
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PrefetchExecutorNotifier.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PrefetchExecutorNotifier.java
new file mode 100644
index 00000000000..e28c6ee1a6c
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/PrefetchExecutorNotifier.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
+import org.apache.hadoop.hbase.io.hfile.PrefetchExecutor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class to submit requests for PrefetchExecutor depending on configuration 
change
+ */
+@InterfaceAudience.Private
+public final class PrefetchExecutorNotifier implements 
PropagatingConfigurationObserver {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PrefetchExecutorNotifier.class);
+
+  /** Wait time in miliseconds before executing prefetch */
+  public static final String PREFETCH_DELAY = "hbase.hfile.prefetch.delay";
+  private final Configuration conf;
+
+  // only for test
+  public PrefetchExecutorNotifier(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void onConfigurationChange(Configuration newConf) {
+    // Update prefetch delay in the prefetch executor class
+    // interrupt and restart threads which have not started executing
+    PrefetchExecutor.loadConfiguration(conf);
+    LOG.info("Config hbase.hfile.prefetch.delay is changed to {}",
+      conf.getInt(PREFETCH_DELAY, 1000));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void registerChildren(ConfigurationManager manager) {
+    // No children to register.
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deregisterChildren(ConfigurationManager manager) {
+    // No children to register
+  }
+
+  public int getPrefetchDelay() {
+    return PrefetchExecutor.getPrefetchDelay();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 0bd81623215..acc3911cc06 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -18,6 +18,9 @@
 package org.apache.hadoop.hbase.io.hfile;
 
 import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_DATA_BLOCKS_COMPRESSED_KEY;
+import static org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY;
+import static 
org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION;
+import static 
org.apache.hadoop.hbase.io.hfile.PrefetchExecutor.PREFETCH_DELAY_VARIATION_DEFAULT_VALUE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.PrefetchExecutorNotifier;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.regionserver.TestHStoreFile;
@@ -73,7 +77,6 @@ public class TestPrefetch {
   private static final int NUM_VALID_KEY_TYPES = KeyValue.Type.values().length 
- 2;
   private static final int DATA_BLOCK_SIZE = 2048;
   private static final int NUM_KV = 1000;
-
   private Configuration conf;
   private CacheConfig cacheConf;
   private FileSystem fs;
@@ -244,6 +247,54 @@ public class TestPrefetch {
 
   }
 
+  @Test
+  public void testOnConfigurationChange() {
+    PrefetchExecutorNotifier prefetchExecutorNotifier = new 
PrefetchExecutorNotifier(conf);
+    conf.setInt(PREFETCH_DELAY, 40000);
+    prefetchExecutorNotifier.onConfigurationChange(conf);
+    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 40000);
+
+    // restore
+    conf.setInt(PREFETCH_DELAY, 30000);
+    prefetchExecutorNotifier.onConfigurationChange(conf);
+    assertEquals(prefetchExecutorNotifier.getPrefetchDelay(), 30000);
+
+    conf.setInt(PREFETCH_DELAY, 1000);
+    prefetchExecutorNotifier.onConfigurationChange(conf);
+  }
+
+  @Test
+  public void testPrefetchWithDelay() throws Exception {
+    // Configure custom delay
+    PrefetchExecutorNotifier prefetchExecutorNotifier = new 
PrefetchExecutorNotifier(conf);
+    conf.setInt(PREFETCH_DELAY, 25000);
+    conf.setFloat(PREFETCH_DELAY_VARIATION, 0.0f);
+    prefetchExecutorNotifier.onConfigurationChange(conf);
+
+    HFileContext context = new 
HFileContextBuilder().withCompression(Compression.Algorithm.GZ)
+      .withBlockSize(DATA_BLOCK_SIZE).build();
+    Path storeFile = writeStoreFile("TestPrefetchWithDelay", context);
+    HFile.Reader reader = HFile.createReader(fs, storeFile, cacheConf, true, 
conf);
+    long startTime = System.currentTimeMillis();
+
+    // Wait for 20 seconds, no thread should start prefetch
+    Thread.sleep(20000);
+    assertFalse("Prefetch threads should not be running at this point", 
reader.prefetchStarted());
+    while (!reader.prefetchStarted()) {
+      assertTrue("Prefetch delay has not been expired yet",
+        getElapsedTime(startTime) < PrefetchExecutor.getPrefetchDelay());
+    }
+    if (reader.prefetchStarted()) {
+      // Added some delay as we have started the timer a bit late.
+      Thread.sleep(500);
+      assertTrue("Prefetch should start post configured delay",
+        getElapsedTime(startTime) > PrefetchExecutor.getPrefetchDelay());
+    }
+    conf.setInt(PREFETCH_DELAY, 1000);
+    conf.setFloat(PREFETCH_DELAY_VARIATION, 
PREFETCH_DELAY_VARIATION_DEFAULT_VALUE);
+    prefetchExecutorNotifier.onConfigurationChange(conf);
+  }
+
   @Test
   public void testPrefetchDoesntSkipHFileLink() throws Exception {
     testPrefetchWhenHFileLink(c -> {
@@ -340,4 +391,8 @@ public class TestPrefetch {
       return keyType;
     }
   }
+
+  private long getElapsedTime(long startTime) {
+    return System.currentTimeMillis() - startTime;
+  }
 }

Reply via email to