This is an automated email from the ASF dual-hosted git repository.

pjain1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c79d28bb7 Copy of #11309 with fixes (#12402)
2c79d28bb7 is described below

commit 2c79d28bb772567893433602c9a39acb0d188e10
Author: Parag Jain <pja...@apache.org>
AuthorDate: Mon Apr 11 21:05:24 2022 +0530

    Copy of #11309 with fixes (#12402)
    
    * Optionally load segment index files into page cache on bootstrap and new 
segment download
    
    * Fix unit test failure
    
    * Fix test case
    
    * fix spelling
    
    * fix spelling
    
    * fix test and test coverage issues
    
    Co-authored-by: Jian Wang <wjh...@gmail.com>
---
 docs/configuration/index.md                        |  2 +
 .../indexing/input/DruidSegmentReaderTest.java     |  7 +++
 .../druid/segment/loading/SegmentCacheManager.java | 12 ++++
 .../druid/segment/loading/SegmentLoader.java       | 13 ++++
 .../druid/segment/loading/SegmentLoaderConfig.java | 18 ++++++
 .../segment/loading/SegmentLocalCacheLoader.java   |  7 ++-
 .../segment/loading/SegmentLocalCacheManager.java  | 69 ++++++++++++++++++++++
 .../org/apache/druid/server/SegmentManager.java    | 16 ++++-
 .../coordination/SegmentLoadDropHandler.java       | 29 ++++++++-
 .../loading/CacheTestSegmentCacheManager.java      |  7 +++
 .../segment/loading/CacheTestSegmentLoader.java    |  8 +++
 .../loading/SegmentLocalCacheManagerTest.java      | 26 ++++++++
 .../apache/druid/server/SegmentManagerTest.java    |  6 ++
 .../coordination/SegmentLoadDropHandlerTest.java   | 11 ++--
 .../server/coordination/ServerManagerTest.java     |  6 ++
 15 files changed, 227 insertions(+), 10 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 21d09a6fc2..dfc7f17baf 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1596,6 +1596,8 @@ These Historical configurations can be defined in the 
`historical/runtime.proper
 |`druid.segmentCache.numBootstrapThreads`|How many segments to load 
concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`|
 |`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns 
metadata lazily during historical startup. When set to true, Historical startup 
time will be dramatically improved by deferring segment loading until the first 
time that segment takes part in a query, which will incur this cost 
instead.|false|
 |`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of 
threads for executing callback actions associated with loading or dropping of 
segments. One might want to increase this number when noticing clusters are 
lagging behind w.r.t. balancing segments across historical nodes.|2|
+|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload`|Number 
of threads to asynchronously read segment index files into null output stream 
on each new segment download after the historical process finishes 
bootstrapping. Recommended to set to 1 or 2 or leave unspecified to disable. 
See also 
`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnBootstrap`|0|
+|`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnBootstrap`|Number 
of threads to asynchronously read segment index files into null output stream 
during historical process bootstrap. This thread pool is terminated after 
historical process finishes bootstrapping. Recommended to set to half of 
available cores. If left unspecified, 
`druid.segmentCache.numThreadsToLoadSegmentsIntoPageCacheOnDownload` will be 
used. If both configs are unspecified, this feature is disabled. Preemptiv [...]
 
 In `druid.segmentCache.locations`, *freeSpacePercent* was added because 
*maxSize* setting is only a theoretical limit and assumes that much space will 
always be available for storing segments. In case of any druid bug leading to 
unaccounted segment files left alone on disk or some other process writing 
stuff to disk, This check can start failing segment loading early before 
filling up the disk completely and leaving the host usable otherwise.
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
index 62c2126df9..9301b891e4 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java
@@ -66,6 +66,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 public class DruidSegmentReaderTest extends NullHandlingTest
 {
@@ -614,6 +615,12 @@ public class DruidSegmentReaderTest extends 
NullHandlingTest
           {
             throw new UnsupportedOperationException();
           }
+
+          @Override
+          public void loadSegmentIntoPageCache(DataSegment segment, 
ExecutorService exec)
+          {
+            throw new UnsupportedOperationException();
+          }
         },
         DataSegment.builder()
                    .dataSource("ds")
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
index 39ea785840..c11ab77ec7 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java
@@ -22,6 +22,7 @@ package org.apache.druid.segment.loading;
 import org.apache.druid.timeline.DataSegment;
 
 import java.io.File;
+import java.util.concurrent.ExecutorService;
 
 /**
  * A class to fetch segment files to local disk and manage the local cache.
@@ -84,4 +85,15 @@ public interface SegmentCacheManager
    * explicitly reserved via {@link #reserve(DataSegment)}
    */
   void cleanup(DataSegment segment);
+
+  /**
+   * Asyncly load segment into page cache.
+   * Equivalent to `cat segment_files > /dev/null` to force loading the 
segment index files into page cache so that
+   * later when the segment is queried, they are already in page cache and 
only a minor page fault needs to be triggered
+   * instead of a major page fault to make the query latency more consistent.
+   *
+   * @param segment The segment to load its index files into page cache
+   * @param exec The thread pool to use
+   */
+  void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec);
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
index 03bc0506f7..65ac548f25 100644
--- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
+++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java
@@ -24,6 +24,8 @@ import org.apache.druid.segment.ReferenceCountingSegment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.timeline.DataSegment;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * Loading segments from deep storage to local storage. Internally, this class 
can delegate the download to
  * {@link SegmentCacheManager}. Implementations must be thread-safe.
@@ -52,4 +54,15 @@ public interface SegmentLoader
    * cleanup any state used by this segment
    */
   void cleanup(DataSegment segment);
+
+  /**
+   * Asyncly load segment into page cache.
+   * Equivalent to `cat segment_files > /dev/null` to force loading the 
segment index files into page cache so that
+   * later when the segment is queried, they are already in page cache and 
only a minor page fault needs to be triggered
+   * instead of a major page fault to make the query latency more consistent.
+   *
+   * @param segment The segment to load its index files into page cache
+   * @param exec The thread pool to use
+   */
+  void loadSegmentIntoPageCache(DataSegment segment, ExecutorService exec);
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
index bc0b79c39f..2e01c7db9e 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java
@@ -56,6 +56,12 @@ public class SegmentLoaderConfig
   @JsonProperty("numBootstrapThreads")
   private Integer numBootstrapThreads = null;
 
+  @JsonProperty("numThreadsToLoadSegmentsIntoPageCacheOnDownload")
+  private int numThreadsToLoadSegmentsIntoPageCacheOnDownload = 0;
+
+  @JsonProperty("numThreadsToLoadSegmentsIntoPageCacheOnBootstrap")
+  private Integer numThreadsToLoadSegmentsIntoPageCacheOnBootstrap = null;
+
   @JsonProperty
   private File infoDir = null;
 
@@ -99,6 +105,18 @@ public class SegmentLoaderConfig
     return numBootstrapThreads == null ? numLoadingThreads : 
numBootstrapThreads;
   }
 
+  public int getNumThreadsToLoadSegmentsIntoPageCacheOnDownload()
+  {
+    return numThreadsToLoadSegmentsIntoPageCacheOnDownload;
+  }
+
+  public int getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap()
+  {
+    return numThreadsToLoadSegmentsIntoPageCacheOnBootstrap == null ?
+           numThreadsToLoadSegmentsIntoPageCacheOnDownload :
+           numThreadsToLoadSegmentsIntoPageCacheOnBootstrap;
+  }
+
   public File getInfoDir()
   {
     if (infoDir == null) {
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
index 77040c4154..b6186dbf48 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java
@@ -31,6 +31,7 @@ import org.apache.druid.timeline.DataSegment;
 import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
 
 public class SegmentLocalCacheLoader implements SegmentLoader
 {
@@ -78,5 +79,9 @@ public class SegmentLocalCacheLoader implements SegmentLoader
     cacheManager.cleanup(segment);
   }
 
-
+  @Override
+  public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService 
exec)
+  {
+    cacheManager.loadSegmentIntoPageCache(segment, exec);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
index 0ad2901e0b..c6c216594e 100644
--- 
a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
+++ 
b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java
@@ -22,9 +22,12 @@ package org.apache.druid.segment.loading;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.NullOutputStream;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.DataSegment;
 
@@ -32,10 +35,14 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  *
@@ -79,6 +86,8 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
 
   private final StorageLocationSelectorStrategy strategy;
 
+  private ExecutorService loadSegmentsIntoPageCacheOnDownloadExec = null;
+
   // Note that we only create this via injection in historical and realtime 
nodes. Peons create these
   // objects via SegmentCacheManagerFactory objects, so that they can store 
segments in task-specific
   // directories rather than statically configured directories.
@@ -95,6 +104,14 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     this.locations = locations;
     this.strategy = strategy;
     log.info("Using storage location strategy: [%s]", 
this.strategy.getClass().getSimpleName());
+
+    if (this.config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload() != 0) 
{
+      loadSegmentsIntoPageCacheOnDownloadExec = Executors.newFixedThreadPool(
+          config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload(),
+          Execs.makeThreadFactory("LoadSegmentsIntoPageCacheOnDownload-%s"));
+      log.info("Size of thread pool to load segments into page cache on 
download [%d]",
+               config.getNumThreadsToLoadSegmentsIntoPageCacheOnDownload());
+    }
   }
 
   @VisibleForTesting
@@ -436,6 +453,58 @@ public class SegmentLocalCacheManager implements 
SegmentCacheManager
     }
   }
 
+  @Override
+  public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService 
exec)
+  {
+    ExecutorService execToUse = exec != null ? exec : 
loadSegmentsIntoPageCacheOnDownloadExec;
+    if (execToUse == null) {
+      return;
+    }
+
+    execToUse.submit(
+        () -> {
+          final ReferenceCountingLock lock = createOrGetLock(segment);
+          synchronized (lock) {
+            try {
+              for (StorageLocation location : locations) {
+                File localStorageDir = new File(location.getPath(), 
DataSegmentPusher.getDefaultStorageDir(segment, false));
+                if (localStorageDir.exists()) {
+                  File baseFile = location.getPath();
+                  if (localStorageDir.equals(baseFile)) {
+                    continue;
+                  }
+
+                  log.info("Loading directory[%s] into page cache", 
localStorageDir);
+
+                  File[] children = localStorageDir.listFiles();
+                  if (children != null) {
+                    for (File child : children) {
+                      InputStream in = null;
+                      try {
+                        in = new FileInputStream(child);
+                        IOUtils.copy(in, new NullOutputStream());
+
+                        log.info("Loaded [%s] into page cache", 
child.getAbsolutePath());
+                      }
+                      catch (Exception e) {
+                        log.error("Failed to load [%s] into page cache, [%s]", 
child.getAbsolutePath(), e.getMessage());
+                      }
+                      finally {
+                        IOUtils.closeQuietly(in);
+                      }
+                    }
+                  }
+                }
+              }
+            }
+            finally {
+              unlock(segment, lock);
+            }
+          }
+        }
+    );
+  }
+
   private void cleanupCacheFiles(File baseFile, File cacheFile)
   {
     if (cacheFile.equals(baseFile)) {
diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java 
b/server/src/main/java/org/apache/druid/server/SegmentManager.java
index dcd5a1d151..329fc16248 100644
--- a/server/src/main/java/org/apache/druid/server/SegmentManager.java
+++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java
@@ -48,6 +48,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -203,18 +204,29 @@ public class SegmentManager
                    .orElseThrow(() -> new ISE("Cannot handle datasource: %s", 
analysis.getDataSource()));
   }
 
+  public boolean loadSegment(final DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed)
+      throws SegmentLoadingException
+  {
+    return loadSegment(segment, lazy, loadFailed, null);
+  }
+
   /**
    * Load a single segment.
    *
    * @param segment segment to load
    * @param lazy    whether to lazy load columns metadata
    * @param loadFailed callBack to execute when segment lazy load failed
+   * @param loadSegmentIntoPageCacheExec If null is specified, the default 
thread pool in segment loader to load
+   *                                     segments into page cache on download 
will be used. You can specify a dedicated
+   *                                     thread pool of larger capacity when 
this function is called during historical
+   *                                     process bootstrap to speed up initial 
loading.
    *
    * @return true if the segment was newly loaded, false if it was already 
loaded
    *
    * @throws SegmentLoadingException if the segment cannot be loaded
    */
-  public boolean loadSegment(final DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
+  public boolean loadSegment(final DataSegment segment, boolean lazy, 
SegmentLazyLoadFailCallback loadFailed,
+                             ExecutorService loadSegmentIntoPageCacheExec) 
throws SegmentLoadingException
   {
     final ReferenceCountingSegment adapter = getSegmentReference(segment, 
lazy, loadFailed);
 
@@ -254,6 +266,8 @@ public class SegmentManager
                 segment.getShardSpec().createChunk(adapter)
             );
             dataSourceState.addSegment(segment);
+            // Asyncly load segment index files into page cache in a thread 
pool
+            segmentLoader.loadSegmentIntoPageCache(segment, 
loadSegmentIntoPageCacheExec);
             resultSupplier.set(true);
 
           }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
index e723d6dd8c..5d4bf76018 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java
@@ -259,20 +259,28 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     );
   }
 
+  private void loadSegment(DataSegment segment, DataSegmentChangeCallback 
callback, boolean lazy)
+      throws SegmentLoadingException
+  {
+    loadSegment(segment, callback, lazy, null);
+  }
+
   /**
    * Load a single segment. If the segment is loaded successfully, this 
function simply returns. Otherwise it will
    * throw a SegmentLoadingException
    *
    * @throws SegmentLoadingException if it fails to load the given segment
    */
-  private void loadSegment(DataSegment segment, DataSegmentChangeCallback 
callback, boolean lazy)
+  private void loadSegment(DataSegment segment, DataSegmentChangeCallback 
callback, boolean lazy, @Nullable
+      ExecutorService loadSegmentIntoPageCacheExec)
       throws SegmentLoadingException
   {
     final boolean loaded;
     try {
       loaded = segmentManager.loadSegment(segment,
               lazy,
-          () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, 
false)
+          () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, 
false),
+              loadSegmentIntoPageCacheExec
       );
     }
     catch (Exception e) {
@@ -346,9 +354,19 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
     }
   }
 
+  /**
+   * Bulk adding segments during bootstrap
+   * @param segments A collection of segments to add
+   * @param callback Segment loading callback
+   */
   private void addSegments(Collection<DataSegment> segments, final 
DataSegmentChangeCallback callback)
   {
+    // Start a temporary thread pool to load segments into page cache during 
bootstrap
     ExecutorService loadingExecutor = null;
+    ExecutorService loadSegmentsIntoPageCacheOnBootstrapExec =
+        config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap() != 0 ?
+        
Execs.multiThreaded(config.getNumThreadsToLoadSegmentsIntoPageCacheOnBootstrap(),
+                            "Load-Segments-Into-Page-Cache-On-Bootstrap-%s") : 
null;
     try (final BackgroundSegmentAnnouncer backgroundSegmentAnnouncer =
              new BackgroundSegmentAnnouncer(announcer, exec, 
config.getAnnounceIntervalMillis())) {
 
@@ -370,7 +388,7 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
                     numSegments,
                     segment.getId()
                 );
-                loadSegment(segment, callback, config.isLazyLoadOnStart());
+                loadSegment(segment, callback, config.isLazyLoadOnStart(), 
loadSegmentsIntoPageCacheOnBootstrapExec);
                 try {
                   backgroundSegmentAnnouncer.announceSegment(segment);
                 }
@@ -416,6 +434,11 @@ public class SegmentLoadDropHandler implements 
DataSegmentChangeHandler
       if (loadingExecutor != null) {
         loadingExecutor.shutdownNow();
       }
+      if (loadSegmentsIntoPageCacheOnBootstrapExec != null) {
+        // At this stage, all tasks have been submitted, send a shutdown 
command to the bootstrap
+        // thread pool so threads will exit after finishing the tasks
+        loadSegmentsIntoPageCacheOnBootstrapExec.shutdown();
+      }
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
 
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
index ca314b97b9..d546e0eb9f 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java
@@ -26,6 +26,7 @@ import java.io.File;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 /**
  *
@@ -69,4 +70,10 @@ public class CacheTestSegmentCacheManager implements 
SegmentCacheManager
   {
     return segmentsInTrash;
   }
+
+  @Override
+  public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService 
exec)
+  {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
 
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
index 831a1a434b..941b459c73 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java
@@ -28,6 +28,8 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
+import java.util.concurrent.ExecutorService;
+
 /**
 */
 public class CacheTestSegmentLoader implements SegmentLoader
@@ -70,6 +72,12 @@ public class CacheTestSegmentLoader implements SegmentLoader
     return ReferenceCountingSegment.wrapSegment(baseSegment, 
segment.getShardSpec());
   }
 
+  @Override
+  public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService 
exec)
+  {
+
+  }
+
   @Override
   public void cleanup(DataSegment segment)
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
index 13a995f361..e207f792c0 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java
@@ -44,6 +44,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Executors;
 
 public class SegmentLocalCacheManagerTest
 {
@@ -100,6 +101,31 @@ public class SegmentLocalCacheManagerTest
     Assert.assertFalse("Expect cache miss", 
manager.isSegmentCached(uncachedSegment));
   }
 
+  @Test
+  public void testNoLoadingOfSegmentInPageCache() throws IOException
+  {
+    final DataSegment segment = 
dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D");
+    final File segmentFile = new File(
+        localSegmentCacheFolder,
+        
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
+    );
+    FileUtils.mkdirp(segmentFile);
+    // should not throw any exception
+    manager.loadSegmentIntoPageCache(segment, null);
+  }
+
+  @Test
+  public void testLoadSegmentInPageCache() throws IOException
+  {
+    final DataSegment segment = 
dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D");
+    final File segmentFile = new File(
+        localSegmentCacheFolder,
+        
"test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"
+    );
+    FileUtils.mkdirp(segmentFile);
+    // should not throw any exception
+    manager.loadSegmentIntoPageCache(segment, 
Executors.newSingleThreadExecutor());
+  }
 
   @Test
   public void testIfTombstoneIsLoaded() throws IOException, 
SegmentLoadingException
diff --git 
a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java 
b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
index 83f0a8ff1e..a64078b4d7 100644
--- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java
@@ -77,6 +77,12 @@ public class SegmentManagerTest
     {
 
     }
+
+    @Override
+    public void loadSegmentIntoPageCache(DataSegment segment, ExecutorService 
exec)
+    {
+
+    }
   };
 
   private static class SegmentForTesting implements Segment
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
index d03bc52cdd..492b788c54 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java
@@ -519,7 +519,8 @@ public class SegmentLoadDropHandlerTest
   public void 
testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed()
 throws Exception
   {
     final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
-    Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
+    Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(),
+                                            ArgumentMatchers.any(), 
ArgumentMatchers.any()))
            .thenThrow(new RuntimeException("segment loading failure test"))
            .thenReturn(true);
     final SegmentLoadDropHandler segmentLoadDropHandler = new 
SegmentLoadDropHandler(
@@ -562,7 +563,7 @@ public class SegmentLoadDropHandlerTest
   public void testProcessBatchLoadDropLoadSequenceForSameSegment() throws 
Exception
   {
     final SegmentManager segmentManager = Mockito.mock(SegmentManager.class);
-    Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any()))
+    Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any()))
            .thenReturn(true);
     
Mockito.doNothing().when(segmentManager).dropSegment(ArgumentMatchers.any());
     final SegmentLoadDropHandler segmentLoadDropHandler = new 
SegmentLoadDropHandler(
@@ -603,7 +604,7 @@ public class SegmentLoadDropHandlerTest
     scheduledRunnable.clear();
 
     // check invocations after a load-drop sequence
-    Mockito.verify(segmentManager, 
Mockito.times(1)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+    Mockito.verify(segmentManager, 
Mockito.times(1)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
     Mockito.verify(segmentManager, 
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
 
     // try to reload the segment - this should be a no-op since it might be 
the case that this is the first load client
@@ -615,7 +616,7 @@ public class SegmentLoadDropHandlerTest
     Assert.assertEquals(STATE.SUCCESS, result.get(0).getStatus().getState());
 
     // check invocations - should stay the same
-    Mockito.verify(segmentManager, 
Mockito.times(1)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+    Mockito.verify(segmentManager, 
Mockito.times(1)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
     Mockito.verify(segmentManager, 
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
 
     // try to reload the segment - this time the loader will know that is a 
fresh request to load
@@ -630,7 +631,7 @@ public class SegmentLoadDropHandlerTest
     scheduledRunnable.clear();
 
     // check invocations - the load segment counter should bump up
-    Mockito.verify(segmentManager, 
Mockito.times(2)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any());
+    Mockito.verify(segmentManager, 
Mockito.times(2)).loadSegment(ArgumentMatchers.any(), 
ArgumentMatchers.anyBoolean(), ArgumentMatchers.any(), ArgumentMatchers.any());
     Mockito.verify(segmentManager, 
Mockito.times(1)).dropSegment(ArgumentMatchers.any());
 
     segmentLoadDropHandler.stop();
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
index 79222a5327..bc16274aab 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java
@@ -168,6 +168,12 @@ public class ServerManagerTest
           {
 
           }
+
+          @Override
+          public void loadSegmentIntoPageCache(DataSegment segment, 
ExecutorService exec)
+          {
+
+          }
         }
     );
     serverManager = new ServerManager(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to