This is an automated email from the ASF dual-hosted git repository.

broustant pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 3adb2be366a SOLR-17671: Replication and Backup use an unwrapped 
Directory to copy files. (#3185)
3adb2be366a is described below

commit 3adb2be366aa8886811ef4f743ff6ef77beee08e
Author: Bruno Roustant <[email protected]>
AuthorDate: Mon Feb 17 18:25:56 2025 +0100

    SOLR-17671: Replication and Backup use an unwrapped Directory to copy 
files. (#3185)
    
    New extensible method CachingDirectoryFactory.filterDirectory.
---
 solr/CHANGES.txt                                   |  3 +
 .../apache/solr/core/CachingDirectoryFactory.java  | 73 +++++++++++++++++-----
 .../org/apache/solr/core/DirectoryFactory.java     | 12 +++-
 .../solr/handler/IncrementalShardBackup.java       |  2 +-
 .../solr/handler/admin/api/ReplicationAPIBase.java | 30 ++++++---
 5 files changed, 91 insertions(+), 29 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index eb179816e72..29902084134 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -88,6 +88,9 @@ Other Changes
 * SOLR-17648: multiThreaded=true: changed queue implementation from unlimited 
to 1000 max, after
   which the caller thread will execute.  (David Smiley)
 
+* SOLR-17671: Replication and backup have their DirectoryFactory.DirContext so 
the directory they use is unwrapped
+  when copying files. (Bruno Roustant, David Smiley)
+
 ==================  9.8.0 ==================
 New Features
 ---------------------
diff --git 
a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java 
b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
index dfe9487809e..8dd86e6d4f4 100644
--- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
@@ -30,10 +30,12 @@ import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.LockFactory;
 import org.apache.lucene.util.IOUtils;
 import org.apache.solr.common.SolrException;
@@ -60,8 +62,8 @@ public abstract class CachingDirectoryFactory extends 
DirectoryFactory {
     private boolean deleteOnClose = false;
 
     public CacheValue(String path, Directory directory) {
-      this.path = path;
-      this.directory = directory;
+      this.path = Objects.requireNonNull(path);
+      this.directory = Objects.requireNonNull(directory);
       this.closeEntries.add(this);
       // for debug
       // this.originTrace = new RuntimeException("Originated from:");
@@ -395,25 +397,22 @@ public abstract class CachingDirectoryFactory extends 
DirectoryFactory {
   public final Directory get(String path, DirContext dirContext, String 
rawLockType)
       throws IOException {
     String fullPath = normalize(path);
+    Directory directory;
+    CacheValue cacheValue;
     synchronized (this) {
       if (closed) {
         throw new AlreadyClosedException("Already closed");
       }
 
-      final CacheValue cacheValue = byPathCache.get(fullPath);
-      Directory directory = null;
-      if (cacheValue != null) {
-        directory = cacheValue.directory;
-      }
-
-      if (directory == null) {
+      cacheValue = byPathCache.get(fullPath);
+      if (cacheValue == null) {
         directory = create(fullPath, createLockFactory(rawLockType), 
dirContext);
         assert ObjectReleaseTracker.track(directory);
         boolean success = false;
         try {
-          CacheValue newCacheValue = new CacheValue(fullPath, directory);
-          byDirectoryCache.put(directory, newCacheValue);
-          byPathCache.put(fullPath, newCacheValue);
+          cacheValue = new CacheValue(fullPath, directory);
+          byDirectoryCache.put(directory, cacheValue);
+          byPathCache.put(fullPath, cacheValue);
           log.debug("return new directory for {}", fullPath);
           success = true;
         } finally {
@@ -422,12 +421,34 @@ public abstract class CachingDirectoryFactory extends 
DirectoryFactory {
           }
         }
       } else {
+        directory = cacheValue.directory;
         cacheValue.refCnt++;
         log.debug("Reusing cached directory: {}", cacheValue);
       }
-
-      return directory;
     }
+
+    Directory filteredDir = filterDirectory(directory, dirContext);
+    // If the directory is filtered/unwrapped, we need to wrap it in a 
ReleasableDirectory
+    // form to be able to recognize it when release(Directory) is called.
+    return filteredDir == directory ? directory : new 
ReleasableDirectory(filteredDir, cacheValue);
+  }
+
+  /**
+   * Potentially filters or unwraps the cached {@link Directory} depending on 
the intended use
+   * defined by the {@link org.apache.solr.core.DirectoryFactory.DirContext}.
+   *
+   * @param dir the {@link Directory} cached by this {@link 
CachingDirectoryFactory}.
+   * @param dirContext the nature or the intended use of the directory.
+   * @return a filtered or unwrapped version of the directory parameter, or 
directly the directory
+   *     parameter if it does not need any filtering/unwrapping.
+   */
+  protected Directory filterDirectory(Directory dir, DirContext dirContext) {
+    // If the DirContext is REPLICATION or BACKUP, then unwrap the Directory 
to allow the caller to
+    // copy raw bytes, skipping any additional logic that would be added by a 
FilterDirectory on top
+    // of the raw Directory.
+    return dirContext == DirContext.REPLICATION || dirContext == 
DirContext.BACKUP
+        ? FilterDirectory.unwrap(dir)
+        : dir;
   }
 
   /*
@@ -488,8 +509,13 @@ public abstract class CachingDirectoryFactory extends 
DirectoryFactory {
 
       CacheValue cacheValue = byDirectoryCache.get(directory);
       if (cacheValue == null) {
-        throw new IllegalArgumentException(
-            "Unknown directory: " + directory + " " + byDirectoryCache);
+        // The directory is not registered, it is a ReleasableDirectory 
wrapper.
+        try {
+          cacheValue = ((ReleasableDirectory) directory).cacheValue;
+        } catch (ClassCastException e) {
+          throw new IllegalArgumentException(
+              "Unknown directory: " + directory + " " + byDirectoryCache);
+        }
       }
       if (log.isDebugEnabled()) {
         log.debug(
@@ -592,4 +618,19 @@ public abstract class CachingDirectoryFactory extends 
DirectoryFactory {
   protected synchronized String getPath(Directory directory) {
     return byDirectoryCache.get(directory).path;
   }
+
+  /**
+   * Delegates to a filtered or unwrapped directory, and allows this caching 
factory to release
+   * correctly the corresponding cached directory.
+   */
+  public static class ReleasableDirectory extends FilterDirectory {
+
+    private final CacheValue cacheValue;
+
+    private ReleasableDirectory(Directory filteredDir, CacheValue cacheValue) {
+      super(filteredDir);
+      assert cacheValue != null;
+      this.cacheValue = cacheValue;
+    }
+  }
 }
diff --git a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java 
b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
index e1d22572bb5..a446b2453ae 100644
--- a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
@@ -57,10 +57,16 @@ public abstract class DirectoryFactory implements 
NamedListInitializedPlugin, Cl
   // Absolute.
   protected Path dataHomePath;
 
-  // hint about what the directory contains - default is index directory
+  /** Hint about what the directory will be used for. */
   public enum DirContext {
+    /** Directory used to read or write the index. */
     DEFAULT,
-    META_DATA
+    /** Directory used to read or write metadata. */
+    META_DATA,
+    /** Directory used to copy raw files during replication. */
+    REPLICATION,
+    /** Directory used to copy raw files during backup. */
+    BACKUP,
   }
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -98,6 +104,8 @@ public abstract class DirectoryFactory implements 
NamedListInitializedPlugin, Cl
    *
    * @throws IOException If there is a low-level I/O error.
    */
+  // TODO: remove the DirContext param from this method and have the 
DirectoryFactory implementation
+  // extend the new CachingDirectoryFactory.filterDirectory if needed.
   protected abstract Directory create(String path, LockFactory lockFactory, 
DirContext dirContext)
       throws IOException;
 
diff --git 
a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java 
b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
index c4de4807588..0e07ac0ca28 100644
--- a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
+++ b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
@@ -150,7 +150,7 @@ public class IncrementalShardBackup {
             .getDirectoryFactory()
             .get(
                 solrCore.getIndexDir(),
-                DirectoryFactory.DirContext.DEFAULT,
+                DirectoryFactory.DirContext.BACKUP,
                 solrCore.getSolrConfig().indexConfig.lockType);
     try {
       BackupStats stats = incrementalCopy(files, dir);
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java 
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
index 638312e592d..4ef50072bfc 100644
--- 
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
+++ 
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
@@ -51,6 +51,7 @@ import org.apache.solr.client.api.model.FileMetaData;
 import org.apache.solr.client.api.model.IndexVersionResponse;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
@@ -160,13 +161,7 @@ public abstract class ReplicationAPIBase extends 
JerseyResource {
       List<FileMetaData> result = new ArrayList<>();
       Directory dir = null;
       try {
-        dir =
-            solrCore
-                .getDirectoryFactory()
-                .get(
-                    solrCore.getNewIndexDir(),
-                    DirectoryFactory.DirContext.DEFAULT,
-                    solrCore.getSolrConfig().indexConfig.lockType);
+        dir = getDirectory();
         SegmentInfos infos = SegmentInfos.readCommit(dir, 
commit.getSegmentsFileName());
         for (SegmentCommitInfo commitInfo : infos) {
           for (String file : commitInfo.files()) {
@@ -246,6 +241,15 @@ public abstract class ReplicationAPIBase extends 
JerseyResource {
     return filesResponse;
   }
 
+  private Directory getDirectory() throws IOException {
+    return solrCore
+        .getDirectoryFactory()
+        .get(
+            solrCore.getNewIndexDir(),
+            DirectoryFactory.DirContext.REPLICATION,
+            solrCore.getSolrConfig().indexConfig.lockType);
+  }
+
   /** This class is used to read and send files in the lucene index */
   protected class DirectoryFileStream implements SolrCore.RawWriter, 
StreamingOutput {
     protected FastOutputStream fos;
@@ -373,11 +377,12 @@ public abstract class ReplicationAPIBase extends 
JerseyResource {
     public void write(OutputStream out) throws IOException {
       createOutputStream(out);
 
+      Directory dir = null;
       IndexInput in = null;
       try {
         initWrite();
 
-        Directory dir = solrCore.withSearcher(searcher -> 
searcher.getIndexReader().directory());
+        dir = getDirectory();
         in = dir.openInput(fileName, IOContext.READONCE);
         // if offset is mentioned move the pointer to that point
         if (offset != -1) in.seek(offset);
@@ -428,8 +433,13 @@ public abstract class ReplicationAPIBase extends 
JerseyResource {
             indexGen,
             useChecksum);
       } finally {
-        if (in != null) {
-          in.close();
+        IOUtils.closeQuietly(in);
+        if (dir != null) {
+          try {
+            solrCore.getDirectoryFactory().release(dir);
+          } catch (IOException e) {
+            log.error("Could not release directory after streaming file", e);
+          }
         }
         extendReserveAndReleaseCommitPoint();
       }

Reply via email to