This is an automated email from the ASF dual-hosted git repository.
broustant pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new 3adb2be366a SOLR-17671: Replication and Backup use an unwrapped
Directory to copy files. (#3185)
3adb2be366a is described below
commit 3adb2be366aa8886811ef4f743ff6ef77beee08e
Author: Bruno Roustant <[email protected]>
AuthorDate: Mon Feb 17 18:25:56 2025 +0100
SOLR-17671: Replication and Backup use an unwrapped Directory to copy
files. (#3185)
New extensible method CachingDirectoryFactory.filterDirectory.
---
solr/CHANGES.txt | 3 +
.../apache/solr/core/CachingDirectoryFactory.java | 73 +++++++++++++++++-----
.../org/apache/solr/core/DirectoryFactory.java | 12 +++-
.../solr/handler/IncrementalShardBackup.java | 2 +-
.../solr/handler/admin/api/ReplicationAPIBase.java | 30 ++++++---
5 files changed, 91 insertions(+), 29 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index eb179816e72..29902084134 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -88,6 +88,9 @@ Other Changes
* SOLR-17648: multiThreaded=true: changed queue implementation from unlimited
to 1000 max, after
which the caller thread will execute. (David Smiley)
+* SOLR-17671: Replication and backup have their DirectoryFactory.DirContext so
the directory they use is unwrapped
+ when copying files. (Bruno Roustant, David Smiley)
+
================== 9.8.0 ==================
New Features
---------------------
diff --git
a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
index dfe9487809e..8dd86e6d4f4 100644
--- a/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/CachingDirectoryFactory.java
@@ -30,10 +30,12 @@ import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.util.IOUtils;
import org.apache.solr.common.SolrException;
@@ -60,8 +62,8 @@ public abstract class CachingDirectoryFactory extends
DirectoryFactory {
private boolean deleteOnClose = false;
public CacheValue(String path, Directory directory) {
- this.path = path;
- this.directory = directory;
+ this.path = Objects.requireNonNull(path);
+ this.directory = Objects.requireNonNull(directory);
this.closeEntries.add(this);
// for debug
// this.originTrace = new RuntimeException("Originated from:");
@@ -395,25 +397,22 @@ public abstract class CachingDirectoryFactory extends
DirectoryFactory {
public final Directory get(String path, DirContext dirContext, String
rawLockType)
throws IOException {
String fullPath = normalize(path);
+ Directory directory;
+ CacheValue cacheValue;
synchronized (this) {
if (closed) {
throw new AlreadyClosedException("Already closed");
}
- final CacheValue cacheValue = byPathCache.get(fullPath);
- Directory directory = null;
- if (cacheValue != null) {
- directory = cacheValue.directory;
- }
-
- if (directory == null) {
+ cacheValue = byPathCache.get(fullPath);
+ if (cacheValue == null) {
directory = create(fullPath, createLockFactory(rawLockType),
dirContext);
assert ObjectReleaseTracker.track(directory);
boolean success = false;
try {
- CacheValue newCacheValue = new CacheValue(fullPath, directory);
- byDirectoryCache.put(directory, newCacheValue);
- byPathCache.put(fullPath, newCacheValue);
+ cacheValue = new CacheValue(fullPath, directory);
+ byDirectoryCache.put(directory, cacheValue);
+ byPathCache.put(fullPath, cacheValue);
log.debug("return new directory for {}", fullPath);
success = true;
} finally {
@@ -422,12 +421,34 @@ public abstract class CachingDirectoryFactory extends
DirectoryFactory {
}
}
} else {
+ directory = cacheValue.directory;
cacheValue.refCnt++;
log.debug("Reusing cached directory: {}", cacheValue);
}
-
- return directory;
}
+
+ Directory filteredDir = filterDirectory(directory, dirContext);
+ // If the directory is filtered/unwrapped, we need to wrap it in a
ReleasableDirectory
+ // form to be able to recognize it when release(Directory) is called.
+ return filteredDir == directory ? directory : new
ReleasableDirectory(filteredDir, cacheValue);
+ }
+
+ /**
+ * Potentially filters or unwraps the cached {@link Directory} depending on
the intended use
+ * defined by the {@link org.apache.solr.core.DirectoryFactory.DirContext}.
+ *
+ * @param dir the {@link Directory} cached by this {@link
CachingDirectoryFactory}.
+ * @param dirContext the nature or the intended use of the directory.
+ * @return a filtered or unwrapped version of the directory parameter, or
directly the directory
+ * parameter if it does not need any filtering/unwrapping.
+ */
+ protected Directory filterDirectory(Directory dir, DirContext dirContext) {
+ // If the DirContext is REPLICATION or BACKUP, then unwrap the Directory
to allow the caller to
+ // copy raw bytes, skipping any additional logic that would be added by a
FilterDirectory on top
+ // of the raw Directory.
+ return dirContext == DirContext.REPLICATION || dirContext ==
DirContext.BACKUP
+ ? FilterDirectory.unwrap(dir)
+ : dir;
}
/*
@@ -488,8 +509,13 @@ public abstract class CachingDirectoryFactory extends
DirectoryFactory {
CacheValue cacheValue = byDirectoryCache.get(directory);
if (cacheValue == null) {
- throw new IllegalArgumentException(
- "Unknown directory: " + directory + " " + byDirectoryCache);
+ // The directory is not registered, it is a ReleasableDirectory
wrapper.
+ try {
+ cacheValue = ((ReleasableDirectory) directory).cacheValue;
+ } catch (ClassCastException e) {
+ throw new IllegalArgumentException(
+ "Unknown directory: " + directory + " " + byDirectoryCache);
+ }
}
if (log.isDebugEnabled()) {
log.debug(
@@ -592,4 +618,19 @@ public abstract class CachingDirectoryFactory extends
DirectoryFactory {
protected synchronized String getPath(Directory directory) {
return byDirectoryCache.get(directory).path;
}
+
+ /**
+ * Delegates to a filtered or unwrapped directory, and allows this caching
factory to release
+ * correctly the corresponding cached directory.
+ */
+ public static class ReleasableDirectory extends FilterDirectory {
+
+ private final CacheValue cacheValue;
+
+ private ReleasableDirectory(Directory filteredDir, CacheValue cacheValue) {
+ super(filteredDir);
+ assert cacheValue != null;
+ this.cacheValue = cacheValue;
+ }
+ }
}
diff --git a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
index e1d22572bb5..a446b2453ae 100644
--- a/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
+++ b/solr/core/src/java/org/apache/solr/core/DirectoryFactory.java
@@ -57,10 +57,16 @@ public abstract class DirectoryFactory implements
NamedListInitializedPlugin, Cl
// Absolute.
protected Path dataHomePath;
- // hint about what the directory contains - default is index directory
+ /** Hint about what the directory will be used for. */
public enum DirContext {
+ /** Directory used to read or write the index. */
DEFAULT,
- META_DATA
+ /** Directory used to read or write metadata. */
+ META_DATA,
+ /** Directory used to copy raw files during replication. */
+ REPLICATION,
+ /** Directory used to copy raw files during backup. */
+ BACKUP,
}
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -98,6 +104,8 @@ public abstract class DirectoryFactory implements
NamedListInitializedPlugin, Cl
*
* @throws IOException If there is a low-level I/O error.
*/
+ // TODO: remove the DirContext param from this method and have the
DirectoryFactory implementation
+ // extend the new CachingDirectoryFactory.filterDirectory if needed.
protected abstract Directory create(String path, LockFactory lockFactory,
DirContext dirContext)
throws IOException;
diff --git
a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
index c4de4807588..0e07ac0ca28 100644
--- a/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
+++ b/solr/core/src/java/org/apache/solr/handler/IncrementalShardBackup.java
@@ -150,7 +150,7 @@ public class IncrementalShardBackup {
.getDirectoryFactory()
.get(
solrCore.getIndexDir(),
- DirectoryFactory.DirContext.DEFAULT,
+ DirectoryFactory.DirContext.BACKUP,
solrCore.getSolrConfig().indexConfig.lockType);
try {
BackupStats stats = incrementalCopy(files, dir);
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
index 638312e592d..4ef50072bfc 100644
---
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
+++
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
@@ -51,6 +51,7 @@ import org.apache.solr.client.api.model.FileMetaData;
import org.apache.solr.client.api.model.IndexVersionResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.FastOutputStream;
+import org.apache.solr.common.util.IOUtils;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
@@ -160,13 +161,7 @@ public abstract class ReplicationAPIBase extends
JerseyResource {
List<FileMetaData> result = new ArrayList<>();
Directory dir = null;
try {
- dir =
- solrCore
- .getDirectoryFactory()
- .get(
- solrCore.getNewIndexDir(),
- DirectoryFactory.DirContext.DEFAULT,
- solrCore.getSolrConfig().indexConfig.lockType);
+ dir = getDirectory();
SegmentInfos infos = SegmentInfos.readCommit(dir,
commit.getSegmentsFileName());
for (SegmentCommitInfo commitInfo : infos) {
for (String file : commitInfo.files()) {
@@ -246,6 +241,15 @@ public abstract class ReplicationAPIBase extends
JerseyResource {
return filesResponse;
}
+ private Directory getDirectory() throws IOException {
+ return solrCore
+ .getDirectoryFactory()
+ .get(
+ solrCore.getNewIndexDir(),
+ DirectoryFactory.DirContext.REPLICATION,
+ solrCore.getSolrConfig().indexConfig.lockType);
+ }
+
/** This class is used to read and send files in the lucene index */
protected class DirectoryFileStream implements SolrCore.RawWriter,
StreamingOutput {
protected FastOutputStream fos;
@@ -373,11 +377,12 @@ public abstract class ReplicationAPIBase extends
JerseyResource {
public void write(OutputStream out) throws IOException {
createOutputStream(out);
+ Directory dir = null;
IndexInput in = null;
try {
initWrite();
- Directory dir = solrCore.withSearcher(searcher ->
searcher.getIndexReader().directory());
+ dir = getDirectory();
in = dir.openInput(fileName, IOContext.READONCE);
// if offset is mentioned move the pointer to that point
if (offset != -1) in.seek(offset);
@@ -428,8 +433,13 @@ public abstract class ReplicationAPIBase extends
JerseyResource {
indexGen,
useChecksum);
} finally {
- if (in != null) {
- in.close();
+ IOUtils.closeQuietly(in);
+ if (dir != null) {
+ try {
+ solrCore.getDirectoryFactory().release(dir);
+ } catch (IOException e) {
+ log.error("Could not release directory after streaming file", e);
+ }
}
extendReserveAndReleaseCommitPoint();
}