This is an automated email from the ASF dual-hosted git repository.
gerlowskija pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 6f94c505fc9 SOLR-16470: Create v2 replication "fetch file" API (#2734)
6f94c505fc9 is described below
commit 6f94c505fc923c452c58c2a3946ee4408ea06fac
Author: Matthew Biscocho <[email protected]>
AuthorDate: Fri Nov 22 09:19:39 2024 -0500
SOLR-16470: Create v2 replication "fetch file" API (#2734)
New v2 API is available at `GET
/api/cores/coreName/replication/files/fileName`
---------
Co-authored-by: Matthew Biscocho <[email protected]>
Co-authored-by: Jason Gerlowski <[email protected]>
---
solr/CHANGES.txt | 3 +
.../solr/client/api/endpoint/ReplicationApis.java | 46 +++
.../org/apache/solr/cloud/ReplicateFromLeader.java | 3 +-
.../src/java/org/apache/solr/core/SolrCore.java | 8 +-
.../org/apache/solr/filestore/NodeFileStore.java | 2 +-
.../java/org/apache/solr/handler/BlobHandler.java | 5 +-
.../org/apache/solr/handler/ExportHandler.java | 5 +-
.../java/org/apache/solr/handler/IndexFetcher.java | 32 +-
.../apache/solr/handler/ReplicationHandler.java | 382 ++++-----------------
.../solr/handler/admin/HealthCheckHandler.java | 2 +-
.../solr/handler/admin/api/CoreReplication.java | 20 ++
.../solr/handler/admin/api/ReplicationAPIBase.java | 370 ++++++++++++++++++++
.../org/apache/solr/handler/api/V2ApiUtils.java | 2 +-
.../solr/handler/TestReplicationHandler.java | 7 +-
.../TestReplicationHandlerDiskOverFlow.java | 1 +
.../handler/admin/api/CoreReplicationAPITest.java | 42 ++-
.../apache/solr/handler/api/V2ApiUtilsTest.java | 2 +-
.../pages/user-managed-index-replication.adoc | 45 +++
18 files changed, 624 insertions(+), 353 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 666958c64da..1ddedd3ec42 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -172,6 +172,9 @@ Improvements
New APIs for listing-all and fetching-single cluster props are also now
available at `GET /api/cluster/properties` and
`GET /api/cluster/properties/somePropName`, respectively. (Carlos Ugarte via
Jason Gerlowski)
+* SOLR-16470: Replication "fetch file" API now has a v2 equivalent, available
at `GET /api/cores/coreName/replication/files/fileName`
+ (Matthew Biscocho via Jason Gerlowski)
+
Optimizations
---------------------
diff --git
a/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java
b/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java
index ac33894b656..3fe5ac14f45 100644
--- a/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java
+++ b/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java
@@ -16,11 +16,18 @@
*/
package org.apache.solr.client.api.endpoint;
+import static
org.apache.solr.client.api.util.Constants.OMIT_FROM_CODEGEN_PROPERTY;
+
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.extensions.Extension;
+import io.swagger.v3.oas.annotations.extensions.ExtensionProperty;
+import jakarta.ws.rs.DefaultValue;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.QueryParam;
+import jakarta.ws.rs.core.StreamingOutput;
import java.io.IOException;
import org.apache.solr.client.api.model.FileListResponse;
import org.apache.solr.client.api.model.IndexVersionResponse;
@@ -47,4 +54,43 @@ public interface ReplicationApis {
@Parameter(description = "The generation number of the index", required
= true)
@QueryParam("generation")
long gen);
+
+ @GET
+ @CoreApiParameters
+ @Operation(
+ summary = "Get a stream of a specific file path of a core",
+ tags = {"core-replication"},
+ extensions = { // TODO Remove as a part of SOLR-17562
+ @Extension(
+ properties = {@ExtensionProperty(name =
OMIT_FROM_CODEGEN_PROPERTY, value = "true")})
+ })
+ @Path("/files/{filePath}")
+ StreamingOutput fetchFile(
+ @PathParam("filePath") String filePath,
+ @Parameter(
+ description =
+ "Directory type for specific filePath (cf or tlogFile).
Defaults to Lucene index (file) directory if empty",
+ required = true)
+ @QueryParam("dirType")
+ String dirType,
+ @Parameter(description = "Output stream read/write offset", required =
false)
+ @QueryParam("offset")
+ String offset,
+ @Parameter(required = false) @QueryParam("len") String len,
+ @Parameter(description = "Compress file output", required = false)
+ @QueryParam("compression")
+ @DefaultValue("false")
+ Boolean compression,
+ @Parameter(description = "Write checksum with output stream", required =
false)
+ @QueryParam("checksum")
+ @DefaultValue("false")
+ Boolean checksum,
+ @Parameter(
+ description = "Limit data write per seconds. Defaults to no
throttling",
+ required = false)
+ @QueryParam("maxWriteMBPerSec")
+ double maxWriteMBPerSec,
+ @Parameter(description = "The generation number of the index", required
= false)
+ @QueryParam("generation")
+ Long gen);
}
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 43390e63e9b..06bbbefedbe 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -27,6 +27,7 @@ import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.IndexFetcher;
import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.CommitUpdateCommand;
@@ -89,7 +90,7 @@ public class ReplicateFromLeader {
NamedList<Object> followerConfig = new NamedList<>();
followerConfig.add(ReplicationHandler.FETCH_FROM_LEADER, Boolean.TRUE);
followerConfig.add(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO,
Boolean.TRUE);
- followerConfig.add(ReplicationHandler.POLL_INTERVAL, pollIntervalStr);
+ followerConfig.add(ReplicationAPIBase.POLL_INTERVAL, pollIntervalStr);
NamedList<Object> replicationConfig = new NamedList<>();
replicationConfig.add("follower", followerConfig);
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java
b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 749436a0e8c..6ac06ffa266 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -106,9 +106,9 @@ import org.apache.solr.core.snapshots.SolrSnapshotManager;
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
import
org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
import org.apache.solr.handler.IndexFetcher;
-import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.handler.SolrConfigHandler;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.handler.api.V2ApiUtils;
import org.apache.solr.handler.component.HighlightComponent;
import org.apache.solr.handler.component.SearchComponent;
@@ -3018,7 +3018,7 @@ public class SolrCore implements SolrInfoBean, Closeable {
m.put("schema.xml", new SchemaXmlResponseWriter());
m.put("smile", new SmileResponseWriter());
m.put(PROMETHEUS_METRICS_WT, new PrometheusResponseWriter());
- m.put(ReplicationHandler.FILE_STREAM, getFileStreamWriter());
+ m.put(ReplicationAPIBase.FILE_STREAM, getFileStreamWriter());
DEFAULT_RESPONSE_WRITERS = Collections.unmodifiableMap(m);
try {
m.put(
@@ -3037,7 +3037,7 @@ public class SolrCore implements SolrInfoBean, Closeable {
@Override
public void write(OutputStream out, SolrQueryRequest req,
SolrQueryResponse response)
throws IOException {
- RawWriter rawWriter = (RawWriter)
response.getValues().get(ReplicationHandler.FILE_STREAM);
+ RawWriter rawWriter = (RawWriter)
response.getValues().get(ReplicationAPIBase.FILE_STREAM);
if (rawWriter != null) {
rawWriter.write(out);
if (rawWriter instanceof Closeable) ((Closeable) rawWriter).close();
@@ -3046,7 +3046,7 @@ public class SolrCore implements SolrInfoBean, Closeable {
@Override
public String getContentType(SolrQueryRequest request, SolrQueryResponse
response) {
- RawWriter rawWriter = (RawWriter)
response.getValues().get(ReplicationHandler.FILE_STREAM);
+ RawWriter rawWriter = (RawWriter)
response.getValues().get(ReplicationAPIBase.FILE_STREAM);
if (rawWriter != null) {
return rawWriter.getContentType();
} else {
diff --git a/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java
b/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java
index 2b12274ad26..e6c61419437 100644
--- a/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java
+++ b/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java
@@ -17,7 +17,7 @@
package org.apache.solr.filestore;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
import static org.apache.solr.response.RawResponseWriter.CONTENT;
import static
org.apache.solr.security.PermissionNameProvider.Name.FILESTORE_READ_PERM;
diff --git a/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
b/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
index 003a9412741..540cc03f9ce 100644
--- a/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
@@ -58,6 +58,7 @@ import org.apache.solr.common.util.Utils;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.api.GetBlobInfoAPI;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.handler.admin.api.UploadBlobAPI;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
@@ -194,7 +195,7 @@ public class BlobHandler extends RequestHandlerBase
return;
}
}
- if
(ReplicationHandler.FILE_STREAM.equals(req.getParams().get(CommonParams.WT))) {
+ if
(ReplicationAPIBase.FILE_STREAM.equals(req.getParams().get(CommonParams.WT))) {
if (blobName == null) {
throw new SolrException(
SolrException.ErrorCode.NOT_FOUND,
@@ -211,7 +212,7 @@ public class BlobHandler extends RequestHandlerBase
new Sort(new SortField("version", SortField.Type.LONG,
true)));
if (docs.totalHits.value > 0) {
rsp.add(
- ReplicationHandler.FILE_STREAM,
+ ReplicationAPIBase.FILE_STREAM,
new SolrCore.RawWriter() {
@Override
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
index 85a99dfaea9..8fa1ba64e38 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.handler.component.SearchHandler;
import org.apache.solr.handler.export.ExportWriter;
import org.apache.solr.handler.export.ExportWriterStream;
@@ -125,10 +126,10 @@ public class ExportHandler extends SearchHandler {
}
String wt = req.getParams().get(CommonParams.WT, JSON);
if ("xsort".equals(wt)) wt = JSON;
- Map<String, String> map = Map.of(CommonParams.WT,
ReplicationHandler.FILE_STREAM);
+ Map<String, String> map = Map.of(CommonParams.WT,
ReplicationAPIBase.FILE_STREAM);
req.setParams(SolrParams.wrapDefaults(new MapSolrParams(map),
req.getParams()));
rsp.add(
- ReplicationHandler.FILE_STREAM,
+ ReplicationAPIBase.FILE_STREAM,
new ExportWriter(
req, rsp, wt, initialStreamContext, solrMetricsContext,
writerMetricsPath));
}
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 0c57ff226c8..95c4a1be611 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -19,27 +19,25 @@ package org.apache.solr.handler;
import static org.apache.solr.common.params.CommonParams.JAVABIN;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.handler.ReplicationHandler.ALIAS;
-import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
import static org.apache.solr.handler.ReplicationHandler.COMMAND;
-import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
-import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
-import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
import static org.apache.solr.handler.ReplicationHandler.FETCH_FROM_LEADER;
-import static org.apache.solr.handler.ReplicationHandler.FILE;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
-import static org.apache.solr.handler.ReplicationHandler.GENERATION;
-import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
import static org.apache.solr.handler.ReplicationHandler.LEADER_URL;
import static org.apache.solr.handler.ReplicationHandler.LEGACY_LEADER_URL;
import static
org.apache.solr.handler.ReplicationHandler.LEGACY_SKIP_COMMIT_ON_LEADER_VERSION_ZERO;
-import static org.apache.solr.handler.ReplicationHandler.OFFSET;
import static org.apache.solr.handler.ReplicationHandler.SIZE;
import static
org.apache.solr.handler.ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.CHECKSUM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.COMPRESSION;
+import static
org.apache.solr.handler.admin.api.ReplicationAPIBase.CONF_FILE_SHORT;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.GENERATION;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.OFFSET;
import java.io.File;
import java.io.FileNotFoundException;
@@ -121,7 +119,7 @@ import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler.FileInfo;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
@@ -303,8 +301,8 @@ public class IndexFetcher {
this.replicationHandler = handler;
String compress = (String) initArgs.get(COMPRESSION);
- useInternalCompression = INTERNAL.equals(compress);
- useExternalCompression = EXTERNAL.equals(compress);
+ useInternalCompression = ReplicationHandler.INTERNAL.equals(compress);
+ useExternalCompression = ReplicationHandler.EXTERNAL.equals(compress);
connTimeout = getParameter(initArgs,
HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000, null);
// allow a leader override for tests - you specify this in /replication
follower section of
@@ -1575,7 +1573,7 @@ public class IndexFetcher {
return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(d);
}
- private final Map<String, FileInfo> confFileInfoCache = new HashMap<>();
+ private final Map<String, ReplicationHandler.FileInfo> confFileInfoCache =
new HashMap<>();
/**
* The local conf files are compared with the conf files in the leader. If
they are same (by
@@ -1723,7 +1721,7 @@ public class IndexFetcher {
* The class acts as a client for ReplicationHandler.FileStream. It
understands the protocol of
* wt=filestream
*
- * @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
+ * <p>see
org.apache.solr.handler.admin.api.ReplicationAPIBase.DirectoryFileStream
*/
private class FileFetcher {
private final FileInterface file;
@@ -1750,7 +1748,7 @@ public class IndexFetcher {
this.file = file;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
- buf = new byte[(int) Math.min(this.size, ReplicationHandler.PACKET_SZ)];
+ buf = new byte[(int) Math.min(this.size, ReplicationAPIBase.PACKET_SZ)];
this.solrParamOutput = solrParamOutput;
this.saveAs = saveAs;
indexGen = latestGen;
@@ -2047,7 +2045,7 @@ public class IndexFetcher {
}
}
- private class DirectoryFileFetcher extends FileFetcher {
+ protected class DirectoryFileFetcher extends FileFetcher {
DirectoryFileFetcher(
Directory tmpIndexDir,
Map<String, Object> fileDetails,
@@ -2107,7 +2105,7 @@ public class IndexFetcher {
}
}
- private class LocalFsFileFetcher extends FileFetcher {
+ protected class LocalFsFileFetcher extends FileFetcher {
LocalFsFileFetcher(
File dir,
Map<String, Object> fileDetails,
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index a73325b49c4..ebea07e26bc 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -17,16 +17,25 @@
package org.apache.solr.handler;
import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.CHECKSUM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.COMPRESSION;
+import static
org.apache.solr.handler.admin.api.ReplicationAPIBase.CONF_FILE_SHORT;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.GENERATION;
+import static
org.apache.solr.handler.admin.api.ReplicationAPIBase.INTERVAL_ERR_MSG;
+import static
org.apache.solr.handler.admin.api.ReplicationAPIBase.INTERVAL_PATTERN;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.LEN;
+import static
org.apache.solr.handler.admin.api.ReplicationAPIBase.MAX_WRITE_PER_SECOND;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.OFFSET;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.STATUS;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.TLOG_FILE;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
@@ -52,11 +61,8 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
-import java.util.zip.DeflaterOutputStream;
-import org.apache.commons.io.output.CloseShieldOutputStream;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexDeletionPolicy;
@@ -64,7 +70,6 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.RateLimiter;
import org.apache.solr.api.JerseyResource;
import org.apache.solr.client.api.model.FileMetaData;
import org.apache.solr.client.api.model.IndexVersionResponse;
@@ -76,7 +81,6 @@ import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -94,6 +98,7 @@ import
org.apache.solr.core.backup.repository.LocalFileSystemRepository;
import org.apache.solr.handler.IndexFetcher.IndexFetchResult;
import org.apache.solr.handler.ReplicationHandler.ReplicationHandlerConfig;
import org.apache.solr.handler.admin.api.CoreReplication;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.handler.admin.api.SnapshotBackupAPI;
import org.apache.solr.handler.api.V2ApiUtils;
import org.apache.solr.jersey.APIConfigProvider;
@@ -273,7 +278,7 @@ public class ReplicationHandler extends RequestHandlerBase
final SolrJerseyResponse indexVersionResponse =
getIndexVersionResponse();
V2ApiUtils.squashIntoSolrResponseWithoutHeader(rsp,
indexVersionResponse);
} else if (command.equals(CMD_GET_FILE)) {
- getFileStream(solrParams, rsp);
+ getFileStream(solrParams, rsp, req);
} else if (command.equals(CMD_GET_FILE_LIST)) {
final CoreReplication coreReplicationAPI = new CoreReplication(core,
req, rsp);
V2ApiUtils.squashIntoSolrResponseWithoutHeader(
@@ -313,6 +318,55 @@ public class ReplicationHandler extends RequestHandlerBase
}
}
+ /**
+ * This method adds an Object of FileStream to the response . The FileStream
implements a custom
+ * protocol which is understood by IndexFetcher.FileFetcher
+ *
+ * @see IndexFetcher.LocalFsFileFetcher
+ * @see IndexFetcher.DirectoryFileFetcher
+ */
+ private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp,
SolrQueryRequest req)
+ throws IOException {
+ final CoreReplication coreReplicationAPI = new CoreReplication(core, req,
rsp);
+ String fileName;
+ String dirType;
+
+ if (solrParams.get(CONF_FILE_SHORT) != null) {
+ fileName = solrParams.get(CONF_FILE_SHORT);
+ dirType = CONF_FILE_SHORT;
+ } else if (solrParams.get(TLOG_FILE) != null) {
+ fileName = solrParams.get(TLOG_FILE);
+ dirType = TLOG_FILE;
+ } else if (solrParams.get(FILE) != null) {
+ fileName = solrParams.get(FILE);
+ dirType = FILE;
+ } else {
+ reportErrorOnResponse(
+ rsp,
+ "Missing file parameter",
+ new SolrException(SolrException.ErrorCode.BAD_REQUEST, "File not
specified in request"));
+ return;
+ }
+
+ if (solrParams.getParams(CommonParams.WT) == null) {
+ reportErrorOnResponse(
+ rsp,
+ "Missing wt parameter",
+ new SolrException(SolrException.ErrorCode.BAD_REQUEST, "wt not
specified in request"));
+ return;
+ }
+
+ coreReplicationAPI.fetchFile(
+ fileName,
+ dirType,
+ solrParams.get(OFFSET),
+ solrParams.get(LEN),
+ Boolean.parseBoolean(solrParams.get(COMPRESSION)),
+ solrParams.getBool(CHECKSUM, false),
+ solrParams.getDouble(MAX_WRITE_PER_SECOND, Double.MAX_VALUE),
+ solrParams.getLong(GENERATION));
+ }
+
static boolean getBoolWithBackwardCompatibility(
SolrParams params, String preferredKey, String alternativeKey, boolean
defaultValue) {
Boolean value = params.getBool(preferredKey);
@@ -674,29 +728,6 @@ public class ReplicationHandler extends RequestHandlerBase
snapShooter.createSnapAsync(numberToKeep, result);
}
- /**
- * This method adds an Object of FileStream to the response . The FileStream
implements a custom
- * protocol which is understood by IndexFetcher.FileFetcher
- *
- * @see IndexFetcher.LocalFsFileFetcher
- * @see IndexFetcher.DirectoryFileFetcher
- */
- private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
- ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
- rawParams.set(CommonParams.WT, FILE_STREAM);
-
- String cfileName = solrParams.get(CONF_FILE_SHORT);
- String tlogFileName = solrParams.get(TLOG_FILE);
- if (cfileName != null) {
- rsp.add(FILE_STREAM, new LocalFsConfFileStream(solrParams));
- } else if (tlogFileName != null) {
- rsp.add(FILE_STREAM, new LocalFsTlogFileStream(solrParams));
- } else {
- rsp.add(FILE_STREAM, new DirectoryFileStream(solrParams));
- }
- rsp.add(STATUS, OK_STATUS);
- }
-
public IndexVersionResponse getIndexVersionResponse() throws IOException {
IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't
change
@@ -908,7 +939,7 @@ public class ReplicationHandler extends RequestHandlerBase
if (fetcher != null) {
map.put(LEADER_URL, fetcher.getLeaderCoreUrl());
if (getPollInterval() != null) {
- map.put(POLL_INTERVAL, getPollInterval());
+ map.put(ReplicationAPIBase.POLL_INTERVAL, getPollInterval());
}
map.put("isPollingDisabled", isPollingDisabled());
map.put("isReplicating", isReplicating());
@@ -990,7 +1021,7 @@ public class ReplicationHandler extends RequestHandlerBase
}
follower.add(LEADER_URL, fetcher.getLeaderCoreUrl());
if (getPollInterval() != null) {
- follower.add(POLL_INTERVAL, getPollInterval());
+ follower.add(ReplicationAPIBase.POLL_INTERVAL, getPollInterval());
}
Date nextScheduled = getNextScheduledExecTime();
if (nextScheduled != null && !isPollingDisabled()) {
@@ -1263,7 +1294,7 @@ public class ReplicationHandler extends RequestHandlerBase
boolean enableFollower = isEnabled(follower);
if (enableFollower) {
currentIndexFetcher = pollingIndexFetcher = new IndexFetcher(follower,
this, core);
- setupPolling((String) follower.get(POLL_INTERVAL));
+ setupPolling((String) follower.get(ReplicationAPIBase.POLL_INTERVAL));
isFollower = true;
}
NamedList<?> leader = getObjectWithBackwardCompatibility(initArgs,
"leader", "master");
@@ -1512,257 +1543,11 @@ public class ReplicationHandler extends
RequestHandlerBase
};
}
- /** This class is used to read and send files in the lucene index */
- private class DirectoryFileStream implements SolrCore.RawWriter {
- protected SolrParams params;
-
- protected FastOutputStream fos;
-
- protected Long indexGen;
- protected IndexDeletionPolicyWrapper delPolicy;
-
- protected String fileName;
- protected String cfileName;
- protected String tlogFileName;
- protected String sOffset;
- protected String sLen;
- protected final boolean compress;
- protected boolean useChecksum;
-
- protected long offset = -1;
- protected int len = -1;
-
- protected Checksum checksum;
-
- private RateLimiter rateLimiter;
-
- byte[] buf;
-
- public DirectoryFileStream(SolrParams solrParams) {
- params = solrParams;
- delPolicy = core.getDeletionPolicy();
-
- fileName = validateFilenameOrError(params.get(FILE));
- cfileName = validateFilenameOrError(params.get(CONF_FILE_SHORT));
- tlogFileName = validateFilenameOrError(params.get(TLOG_FILE));
-
- sOffset = params.get(OFFSET);
- sLen = params.get(LEN);
- compress = Boolean.parseBoolean(params.get(COMPRESSION));
- useChecksum = params.getBool(CHECKSUM, false);
- indexGen = params.getLong(GENERATION);
- if (useChecksum) {
- checksum = new Adler32();
- }
- // No throttle if MAX_WRITE_PER_SECOND is not specified
- double maxWriteMBPerSec = params.getDouble(MAX_WRITE_PER_SECOND,
Double.MAX_VALUE);
- rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
- }
-
- // Throw exception on directory traversal attempts
- protected String validateFilenameOrError(String fileName) {
- if (fileName != null) {
- Path filePath = Paths.get(fileName);
- filePath.forEach(
- subpath -> {
- if ("..".equals(subpath.toString())) {
- throw new SolrException(ErrorCode.FORBIDDEN, "File name cannot
contain ..");
- }
- });
- if (filePath.isAbsolute()) {
- throw new SolrException(ErrorCode.FORBIDDEN, "File name must be
relative");
- }
- return fileName;
- } else return null;
- }
-
- protected void initWrite() throws IOException {
- if (sOffset != null) offset = Long.parseLong(sOffset);
- if (sLen != null) len = Integer.parseInt(sLen);
- if (fileName == null && cfileName == null && tlogFileName == null) {
- // no filename do nothing
- writeNothingAndFlush();
- }
- buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
-
- // reserve commit point till write is complete
- if (indexGen != null) {
- delPolicy.saveCommitPoint(indexGen);
- }
- }
-
- protected void createOutputStream(OutputStream out) {
- // DeflaterOutputStream requires a close call, but don't close the
request outputstream
- out = new CloseShieldOutputStream(out);
- if (compress) {
- fos = new FastOutputStream(new DeflaterOutputStream(out));
- } else {
- fos = new FastOutputStream(out);
- }
- }
-
- protected void extendReserveAndReleaseCommitPoint() {
- if (indexGen != null) {
- // Reserve the commit point for another 10s for the next file to be to
fetched.
- // We need to keep extending the commit reservation between requests
so that the replica can
- // fetch all the files correctly.
- delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
-
- // release the commit point as the write is complete
- delPolicy.releaseCommitPoint(indexGen);
- }
- }
-
- @Override
- public void write(OutputStream out) throws IOException {
- createOutputStream(out);
-
- IndexInput in = null;
- try {
- initWrite();
-
- Directory dir = core.withSearcher(searcher ->
searcher.getIndexReader().directory());
- in = dir.openInput(fileName, IOContext.READONCE);
- // if offset is mentioned move the pointer to that point
- if (offset != -1) in.seek(offset);
-
- long filelen = dir.fileLength(fileName);
- long maxBytesBeforePause = 0;
-
- while (true) {
- offset = offset == -1 ? 0 : offset;
- int read = (int) Math.min(buf.length, filelen - offset);
- in.readBytes(buf, 0, read);
-
- fos.writeInt(read);
- if (useChecksum) {
- checksum.reset();
- checksum.update(buf, 0, read);
- fos.writeLong(checksum.getValue());
- }
- fos.write(buf, 0, read);
- fos.flush();
- log.debug("Wrote {} bytes for file {}", offset + read, fileName); //
nowarn
-
- // Pause if necessary
- maxBytesBeforePause += read;
- if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
- rateLimiter.pause(maxBytesBeforePause);
- maxBytesBeforePause = 0;
- }
- if (read != buf.length) {
- writeNothingAndFlush();
- // we close because DeflaterOutputStream requires a close call,
but the request
- // outputstream is protected
- fos.close();
- break;
- }
- offset += read;
- in.seek(offset);
- }
- } catch (IOException e) {
- log.warn("Exception while writing response for params: {}", params, e);
- } finally {
- if (in != null) {
- in.close();
- }
- extendReserveAndReleaseCommitPoint();
- }
- }
-
- /** Used to write a marker for EOF */
- protected void writeNothingAndFlush() throws IOException {
- fos.writeInt(0);
- fos.flush();
- }
- }
-
- /** This is used to write files in the conf directory. */
- private abstract class LocalFsFileStream extends DirectoryFileStream {
-
- private Path file;
-
- public LocalFsFileStream(SolrParams solrParams) {
- super(solrParams);
- this.file = this.initFile();
- }
-
- protected abstract Path initFile();
-
- @Override
- public void write(OutputStream out) throws IOException {
- createOutputStream(out);
- try {
- initWrite();
-
- if (Files.isReadable(file)) {
- try (SeekableByteChannel channel = Files.newByteChannel(file)) {
- // if offset is mentioned move the pointer to that point
- if (offset != -1) channel.position(offset);
- ByteBuffer bb = ByteBuffer.wrap(buf);
-
- while (true) {
- bb.clear();
- long bytesRead = channel.read(bb);
- if (bytesRead <= 0) {
- writeNothingAndFlush();
- // we close because DeflaterOutputStream requires a close
call, but the request
- // outputstream is protected
- fos.close();
- break;
- }
- fos.writeInt((int) bytesRead);
- if (useChecksum) {
- checksum.reset();
- checksum.update(buf, 0, (int) bytesRead);
- fos.writeLong(checksum.getValue());
- }
- fos.write(buf, 0, (int) bytesRead);
- fos.flush();
- }
- }
- } else {
- writeNothingAndFlush();
- }
- } catch (IOException e) {
- log.warn("Exception while writing response for params: {}", params, e);
- } finally {
- extendReserveAndReleaseCommitPoint();
- }
- }
- }
-
- private class LocalFsTlogFileStream extends LocalFsFileStream {
-
- public LocalFsTlogFileStream(SolrParams solrParams) {
- super(solrParams);
- }
-
- @Override
- protected Path initFile() {
- // if it is a tlog file read from tlog directory
- return Path.of(core.getUpdateHandler().getUpdateLog().getTlogDir(),
tlogFileName);
- }
- }
-
- private class LocalFsConfFileStream extends LocalFsFileStream {
-
- public LocalFsConfFileStream(SolrParams solrParams) {
- super(solrParams);
- }
-
- @Override
- protected Path initFile() {
- // if it is a conf file read from config directory
- return core.getResourceLoader().getConfigPath().resolve(cfileName);
- }
- }
-
- private static Long readIntervalMs(String interval) {
+ private Long readIntervalMs(String interval) {
return TimeUnit.MILLISECONDS.convert(readIntervalNs(interval),
TimeUnit.NANOSECONDS);
}
- private static Long readIntervalNs(String interval) {
+ private Long readIntervalNs(String interval) {
if (interval == null) return null;
int result = 0;
Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
@@ -1814,8 +1599,6 @@ public class ReplicationHandler extends RequestHandlerBase
public static final String LEGACY_SKIP_COMMIT_ON_LEADER_VERSION_ZERO =
"skipCommitOnMasterVersionZero";
- public static final String STATUS = "status";
-
public static final String MESSAGE = "message";
public static final String COMMAND = "command";
@@ -1850,24 +1633,8 @@ public class ReplicationHandler extends
RequestHandlerBase
public static final String CMD_DELETE_BACKUP = "deletebackup";
- public static final String GENERATION = "generation";
-
- public static final String OFFSET = "offset";
-
- public static final String LEN = "len";
-
- public static final String FILE = "file";
-
public static final String SIZE = "size";
- public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
-
- public static final String CONF_FILE_SHORT = "cf";
-
- public static final String TLOG_FILE = "tlogFile";
-
- public static final String CHECKSUM = "checksum";
-
public static final String ALIAS = "alias";
public static final String CONF_CHECKSUM = "confchecksum";
@@ -1876,21 +1643,8 @@ public class ReplicationHandler extends
RequestHandlerBase
public static final String REPLICATE_AFTER = "replicateAfter";
- public static final String FILE_STREAM = "filestream";
-
- public static final String POLL_INTERVAL = "pollInterval";
-
- public static final String INTERVAL_ERR_MSG =
- "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
-
- private static final Pattern INTERVAL_PATTERN =
Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
-
- public static final int PACKET_SZ = 1024 * 1024; // 1MB
-
public static final String RESERVE = "commitReserveDuration";
- public static final String COMPRESSION = "compression";
-
public static final String EXTERNAL = "external";
public static final String INTERNAL = "internal";
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java
b/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java
index a3724aa626c..897d9921e2c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java
@@ -20,7 +20,7 @@ package org.apache.solr.handler.admin;
import static org.apache.solr.common.params.CommonParams.FAILURE;
import static org.apache.solr.common.params.CommonParams.OK;
import static org.apache.solr.common.params.CommonParams.STATUS;
-import static org.apache.solr.handler.ReplicationHandler.GENERATION;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.GENERATION;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java
b/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java
index c4071ced8fc..1389c6e780f 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java
@@ -19,10 +19,12 @@ package org.apache.solr.handler.admin.api;
import static
org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM;
import jakarta.inject.Inject;
+import jakarta.ws.rs.core.StreamingOutput;
import java.io.IOException;
import org.apache.solr.client.api.endpoint.ReplicationApis;
import org.apache.solr.client.api.model.FileListResponse;
import org.apache.solr.client.api.model.IndexVersionResponse;
+import org.apache.solr.common.SolrException;
import org.apache.solr.core.SolrCore;
import org.apache.solr.jersey.PermissionName;
import org.apache.solr.request.SolrQueryRequest;
@@ -51,4 +53,22 @@ public class CoreReplication extends ReplicationAPIBase
implements ReplicationAp
public FileListResponse fetchFileList(long gen) {
return doFetchFileList(gen);
}
+
+ @Override
+ @PermissionName(CORE_READ_PERM)
+ public StreamingOutput fetchFile(
+ String filePath,
+ String dirType,
+ String offset,
+ String len,
+ Boolean compression,
+ Boolean checksum,
+ double maxWriteMBPerSec,
+ Long gen) {
+ if (dirType == null) {
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Must
provide a dirType ");
+ }
+ return doFetchFile(
+ filePath, dirType, offset, len, compression, checksum,
maxWriteMBPerSec, gen);
+ }
}
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
index 0f5e9d5dcae..638312e592d 100644
---
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
+++
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
@@ -19,11 +19,24 @@ package org.apache.solr.handler.admin.api;
import static org.apache.solr.handler.ReplicationHandler.ERR_STATUS;
import static org.apache.solr.handler.ReplicationHandler.OK_STATUS;
+import jakarta.ws.rs.core.StreamingOutput;
import java.io.IOException;
+import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+import java.util.zip.DeflaterOutputStream;
+import org.apache.commons.io.output.CloseShieldOutputStream;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentCommitInfo;
@@ -31,10 +44,13 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RateLimiter;
import org.apache.solr.api.JerseyResource;
import org.apache.solr.client.api.model.FileListResponse;
import org.apache.solr.client.api.model.FileMetaData;
import org.apache.solr.client.api.model.IndexVersionResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.FastOutputStream;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
@@ -47,7 +63,24 @@ import org.slf4j.LoggerFactory;
/** A common parent for "replication" (i.e. replication-level) APIs. */
public abstract class ReplicationAPIBase extends JerseyResource {
+ public static final String CONF_FILE_SHORT = "cf";
+ public static final String TLOG_FILE = "tlogFile";
+ public static final String FILE_STREAM = "filestream";
+ public static final String STATUS = "status";
+ public static final int PACKET_SZ = 1024 * 1024; // 1MB
+ public static final String GENERATION = "generation";
+ public static final String OFFSET = "offset";
+ public static final String LEN = "len";
+ public static final String FILE = "file";
+ public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
+ public static final String CHECKSUM = "checksum";
+ public static final String COMPRESSION = "compression";
+ public static final String POLL_INTERVAL = "pollInterval";
+ public static final String INTERVAL_ERR_MSG =
+ "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
+
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final Pattern INTERVAL_PATTERN =
Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
protected final SolrCore solrCore;
protected final SolrQueryRequest solrQueryRequest;
protected final SolrQueryResponse solrQueryResponse;
@@ -71,6 +104,33 @@ public abstract class ReplicationAPIBase extends
JerseyResource {
return getFileList(generation, replicationHandler);
}
+ protected DirectoryFileStream doFetchFile(
+ String filePath,
+ String dirType,
+ String offset,
+ String len,
+ boolean compression,
+ boolean checksum,
+ double maxWriteMBPerSec,
+ Long gen) {
+ DirectoryFileStream dfs;
+ if (Objects.equals(dirType, CONF_FILE_SHORT)) {
+ dfs =
+ new LocalFsConfFileStream(
+ filePath, dirType, offset, len, compression, checksum,
maxWriteMBPerSec, gen);
+ } else if (Objects.equals(dirType, TLOG_FILE)) {
+ dfs =
+ new LocalFsTlogFileStream(
+ filePath, dirType, offset, len, compression, checksum,
maxWriteMBPerSec, gen);
+ } else {
+ dfs =
+ new DirectoryFileStream(
+ filePath, dirType, offset, len, compression, checksum,
maxWriteMBPerSec, gen);
+ }
+ solrQueryResponse.add(FILE_STREAM, dfs);
+ return dfs;
+ }
+
protected FileListResponse getFileList(long generation, ReplicationHandler
replicationHandler) {
final IndexDeletionPolicyWrapper delPol = solrCore.getDeletionPolicy();
final FileListResponse filesResponse = new FileListResponse();
@@ -186,6 +246,316 @@ public abstract class ReplicationAPIBase extends
JerseyResource {
return filesResponse;
}
+ /** This class is used to read and send files in the lucene index */
+ protected class DirectoryFileStream implements SolrCore.RawWriter,
StreamingOutput {
+ protected FastOutputStream fos;
+
+ protected Long indexGen;
+ protected IndexDeletionPolicyWrapper delPolicy;
+
+ protected String fileName;
+ protected String cfileName;
+ protected String tlogFileName;
+ protected String sOffset;
+ protected String sLen;
+ protected final boolean compress;
+ protected boolean useChecksum;
+
+ protected long offset = -1;
+ protected int len = -1;
+
+ protected Checksum checksum;
+
+ private RateLimiter rateLimiter;
+
+ byte[] buf;
+
+ public DirectoryFileStream(
+ String file,
+ String dirType,
+ String offset,
+ String len,
+ boolean compression,
+ boolean useChecksum,
+ double maxWriteMBPerSec,
+ Long gen) {
+ delPolicy = solrCore.getDeletionPolicy();
+
+ fileName = validateFilenameOrError(file);
+
+ switch (dirType) {
+ case CONF_FILE_SHORT:
+ cfileName = file;
+ break;
+ case TLOG_FILE:
+ tlogFileName = file;
+ break;
+ default:
+ fileName = file;
+ break;
+ }
+
+ this.sOffset = offset;
+ this.sLen = len;
+ this.compress = compression;
+ this.useChecksum = useChecksum;
+ this.indexGen = gen;
+ if (useChecksum) {
+ checksum = new Adler32();
+ }
+ // No throttle if MAX_WRITE_PER_SECOND is not specified
+ if (maxWriteMBPerSec == 0) {
+ this.rateLimiter = new RateLimiter.SimpleRateLimiter(Double.MAX_VALUE);
+ } else {
+ this.rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
+ }
+ }
+
+ // Throw exception on directory traversal attempts
+ protected String validateFilenameOrError(String fileName) {
+ if (fileName != null) {
+ Path filePath = Paths.get(fileName);
+ filePath.forEach(
+ subpath -> {
+ if ("..".equals(subpath.toString())) {
+ throw new SolrException(
+ SolrException.ErrorCode.FORBIDDEN, "File name cannot
contain ..");
+ }
+ });
+ if (filePath.isAbsolute()) {
+ throw new SolrException(SolrException.ErrorCode.FORBIDDEN, "File
name must be relative");
+ }
+ return fileName;
+ } else return null;
+ }
+
+ protected void initWrite() throws IOException {
+ this.offset = (sOffset != null) ? Long.parseLong(sOffset) : -1;
+ this.len = (sLen != null) ? Integer.parseInt(sLen) : -1;
+ if (fileName == null && cfileName == null && tlogFileName == null) {
+ // no filename do nothing
+ writeNothingAndFlush();
+ }
+ buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
+
+ // reserve commit point till write is complete
+ if (indexGen != null) {
+ delPolicy.saveCommitPoint(indexGen);
+ }
+ }
+
+ protected void createOutputStream(OutputStream out) {
+ // DeflaterOutputStream requires a close call, but don't close the
request outputstream
+ out = new CloseShieldOutputStream(out);
+ if (compress) {
+ fos = new FastOutputStream(new DeflaterOutputStream(out));
+ } else {
+ fos = new FastOutputStream(out);
+ }
+ }
+
+ protected void extendReserveAndReleaseCommitPoint() {
+ ReplicationHandler replicationHandler =
+ (ReplicationHandler)
solrCore.getRequestHandler(ReplicationHandler.PATH);
+
+ if (indexGen != null) {
+ // Reserve the commit point for another 10s for the next file to be to
fetched.
+ // We need to keep extending the commit reservation between requests
so that the replica can
+ // fetch all the files correctly.
+ delPolicy.setReserveDuration(indexGen,
replicationHandler.getReserveCommitDuration());
+
+ // release the commit point as the write is complete
+ delPolicy.releaseCommitPoint(indexGen);
+ }
+ }
+
+ @Override
+ public void write(OutputStream out) throws IOException {
+ createOutputStream(out);
+
+ IndexInput in = null;
+ try {
+ initWrite();
+
+ Directory dir = solrCore.withSearcher(searcher ->
searcher.getIndexReader().directory());
+ in = dir.openInput(fileName, IOContext.READONCE);
+ // if offset is mentioned move the pointer to that point
+ if (offset != -1) in.seek(offset);
+
+ long filelen = dir.fileLength(fileName);
+ long maxBytesBeforePause = 0;
+
+ while (true) {
+ offset = offset == -1 ? 0 : offset;
+ int read = (int) Math.min(buf.length, filelen - offset);
+ in.readBytes(buf, 0, read);
+
+ fos.writeInt(read);
+ if (useChecksum) {
+ checksum.reset();
+ checksum.update(buf, 0, read);
+ fos.writeLong(checksum.getValue());
+ }
+ fos.write(buf, 0, read);
+ fos.flush();
+ log.debug("Wrote {} bytes for file {}", offset + read, fileName); //
nowarn
+
+ // Pause if necessary
+ maxBytesBeforePause += read;
+ if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
+ rateLimiter.pause(maxBytesBeforePause);
+ maxBytesBeforePause = 0;
+ }
+ if (read != buf.length) {
+ writeNothingAndFlush();
+ // we close because DeflaterOutputStream requires a close call,
but the request
+ // outputstream is protected
+ fos.close();
+ break;
+ }
+ offset += read;
+ in.seek(offset);
+ }
+ } catch (IOException e) {
+ log.warn(
+ "Exception while writing response for params fileName={}
cfileName={} tlogFileName={} offset={} len={} compression={} generation={}
checksum={}",
+ fileName,
+ cfileName,
+ tlogFileName,
+ sOffset,
+ sLen,
+ compress,
+ indexGen,
+ useChecksum);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ extendReserveAndReleaseCommitPoint();
+ }
+ }
+
+ /** Used to write a marker for EOF */
+ protected void writeNothingAndFlush() throws IOException {
+ fos.writeInt(0);
+ fos.flush();
+ }
+ }
+
+ /** This is used to write files in the conf directory. */
+ protected abstract class LocalFsFileStream extends DirectoryFileStream {
+
+ private Path file;
+
+ public LocalFsFileStream(
+ String file,
+ String dirType,
+ String offset,
+ String len,
+ boolean compression,
+ boolean useChecksum,
+ double maxWriteMBPerSec,
+ Long gen) {
+ super(file, dirType, offset, len, compression, useChecksum,
maxWriteMBPerSec, gen);
+ this.file = this.initFile();
+ }
+
+ protected abstract Path initFile();
+
+ @Override
+ public void write(OutputStream out) throws IOException {
+ createOutputStream(out);
+ try {
+ initWrite();
+
+ if (Files.isReadable(file)) {
+ try (SeekableByteChannel channel = Files.newByteChannel(file)) {
+ // if offset is mentioned move the pointer to that point
+ if (offset != -1) channel.position(offset);
+ ByteBuffer bb = ByteBuffer.wrap(buf);
+
+ while (true) {
+ bb.clear();
+ long bytesRead = channel.read(bb);
+ if (bytesRead <= 0) {
+ writeNothingAndFlush();
+ // we close because DeflaterOutputStream requires a close
call, but the request
+ // outputstream is protected
+ fos.close();
+ break;
+ }
+ fos.writeInt((int) bytesRead);
+ if (useChecksum) {
+ checksum.reset();
+ checksum.update(buf, 0, (int) bytesRead);
+ fos.writeLong(checksum.getValue());
+ }
+ fos.write(buf, 0, (int) bytesRead);
+ fos.flush();
+ }
+ }
+ } else {
+ writeNothingAndFlush();
+ }
+ } catch (IOException e) {
+ log.warn(
+ "Exception while writing response for params fileName={}
cfileName={} tlogFileName={} offset={} len={} compression={} generation={}
checksum={}",
+ fileName,
+ cfileName,
+ tlogFileName,
+ sOffset,
+ sLen,
+ compress,
+ indexGen,
+ useChecksum);
+ } finally {
+ extendReserveAndReleaseCommitPoint();
+ }
+ }
+ }
+
+ protected class LocalFsTlogFileStream extends LocalFsFileStream {
+
+ public LocalFsTlogFileStream(
+ String file,
+ String dirType,
+ String offset,
+ String len,
+ boolean compression,
+ boolean useChecksum,
+ double maxWriteMBPerSec,
+ Long gen) {
+ super(file, dirType, offset, len, compression, useChecksum,
maxWriteMBPerSec, gen);
+ }
+
+ @Override
+ protected Path initFile() {
+ // if it is a tlog file read from tlog directory
+ return Path.of(solrCore.getUpdateHandler().getUpdateLog().getTlogDir(),
tlogFileName);
+ }
+ }
+
+ protected class LocalFsConfFileStream extends LocalFsFileStream {
+
+ public LocalFsConfFileStream(
+ String file,
+ String dirType,
+ String offset,
+ String len,
+ boolean compression,
+ boolean useChecksum,
+ double maxWriteMBPerSec,
+ Long gen) {
+ super(file, dirType, offset, len, compression, useChecksum,
maxWriteMBPerSec, gen);
+ }
+
+ @Override
+ protected Path initFile() {
+ // if it is a conf file read from config directory
+ return solrCore.getResourceLoader().getConfigPath().resolve(cfileName);
+ }
+ }
+
private void reportErrorOnResponse(
FileListResponse fileListResponse, String message, Exception e) {
fileListResponse.status = ERR_STATUS;
diff --git a/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java
b/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java
index e2a907d07d4..9a96b34afc0 100644
--- a/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java
+++ b/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java
@@ -19,7 +19,7 @@ package org.apache.solr.handler.api;
import static
org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2;
import static org.apache.solr.common.params.CommonParams.WT;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonProperty;
diff --git
a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
index 8f424e97803..d1566b9c80d 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
@@ -72,6 +72,7 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.core.StandardDirectoryFactory;
import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
import org.apache.solr.embedded.JettySolrRunner;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
import org.apache.solr.security.AllowListUrlChecker;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.TimeOut;
@@ -1518,9 +1519,9 @@ public class TestReplicationHandler extends
SolrTestCaseJ4 {
Arrays.asList(absFile, "../dir/traversal", "illegal\rfile\nname\t");
List<String> params =
Arrays.asList(
- ReplicationHandler.FILE,
- ReplicationHandler.CONF_FILE_SHORT,
- ReplicationHandler.TLOG_FILE);
+ ReplicationAPIBase.FILE,
+ ReplicationAPIBase.CONF_FILE_SHORT,
+ ReplicationAPIBase.TLOG_FILE);
for (String param : params) {
for (String filename : illegalFilenames) {
expectThrows(
diff --git
a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
index c66a1dd7201..67b5cc47d9c 100644
---
a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
+++
b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
@@ -214,6 +214,7 @@ public class TestReplicationHandlerDiskOverFlow extends
SolrTestCaseJ4 {
.add("qt", "/replication")
.add("command", CMD_FETCH_INDEX)
.add("wait", "true"));
+
assertEquals("Replication command status", "OK",
response._getStr("status", null));
assertEquals(
diff --git
a/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
b/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
index 5943f7311a8..2e3a321bc5e 100644
---
a/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
+++
b/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
@@ -21,6 +21,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import io.opentelemetry.api.trace.Span;
+import java.io.IOException;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -29,9 +31,12 @@ import org.apache.solr.client.api.model.FileListResponse;
import org.apache.solr.client.api.model.FileMetaData;
import org.apache.solr.client.api.model.IndexVersionResponse;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.SolrResourceLoader;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -42,8 +47,6 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
private CoreReplication coreReplicationAPI;
private SolrCore mockCore;
private ReplicationHandler mockReplicationHandler;
- private SolrQueryRequest mockQueryRequest;
- private SolrQueryResponse queryResponse;
@BeforeClass
public static void ensureWorkingMockito() {
@@ -55,9 +58,9 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
public void setUp() throws Exception {
super.setUp();
setUpMocks();
- mockQueryRequest = mock(SolrQueryRequest.class);
+ final var mockQueryRequest = mock(SolrQueryRequest.class);
when(mockQueryRequest.getSpan()).thenReturn(Span.getInvalid());
- queryResponse = new SolrQueryResponse();
+ final var queryResponse = new SolrQueryResponse();
coreReplicationAPI = new CoreReplicationAPIMock(mockCore,
mockQueryRequest, queryResponse);
}
@@ -81,10 +84,37 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
assertEquals(123456789, actualResponse.fileList.get(0).checksum);
}
- private void setUpMocks() {
+ @Test
+ public void testFetchFile() throws Exception {
+ ReplicationAPIBase.DirectoryFileStream actual =
+ coreReplicationAPI.doFetchFile("./test", "file", null, null, false,
false, 0, null);
+ assertNotNull(actual);
+
+ actual =
+ coreReplicationAPI.doFetchFile("./test", "tlogFile", null, null,
false, false, 0, null);
+ assertTrue(actual instanceof ReplicationAPIBase.LocalFsTlogFileStream);
+
+ actual = coreReplicationAPI.doFetchFile("./test", "cf", null, null, false,
false, 0, null);
+ assertTrue(actual instanceof ReplicationAPIBase.LocalFsConfFileStream);
+ }
+
+ private void setUpMocks() throws IOException {
mockCore = mock(SolrCore.class);
mockReplicationHandler = mock(ReplicationHandler.class);
+
+ // Mocks for LocalFsTlogFileStream
+ UpdateHandler mockUpdateHandler = mock(UpdateHandler.class);
+ UpdateLog mockUpdateLog = mock(UpdateLog.class);
+ when(mockUpdateHandler.getUpdateLog()).thenReturn(mockUpdateLog);
+ when(mockUpdateLog.getTlogDir()).thenReturn("ignore");
+
+ // Mocks for LocalFsConfFileStream
+ SolrResourceLoader mockSolrResourceLoader = mock(SolrResourceLoader.class);
+ Path mockPath = mock(Path.class);
when(mockCore.getRequestHandler(ReplicationHandler.PATH)).thenReturn(mockReplicationHandler);
+ when(mockCore.getUpdateHandler()).thenReturn(mockUpdateHandler);
+ when(mockCore.getResourceLoader()).thenReturn(mockSolrResourceLoader);
+ when(mockSolrResourceLoader.getConfigPath()).thenReturn(mockPath);
}
private static class CoreReplicationAPIMock extends CoreReplication {
@@ -94,7 +124,7 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
@Override
protected FileListResponse getFileList(long generation, ReplicationHandler
replicationHandler) {
- final FileListResponse filesResponse = new FileListResponse();
+ final var filesResponse = new FileListResponse();
List<FileMetaData> fileMetaData = Arrays.asList(new FileMetaData(123,
"test", 123456789));
filesResponse.fileList = new ArrayList<>(fileMetaData);
return filesResponse;
diff --git a/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java
b/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java
index 51b89243dbe..78f2f8d5fb9 100644
--- a/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java
@@ -17,7 +17,7 @@
package org.apache.solr.handler.api;
import static
org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
import jakarta.ws.rs.core.MediaType;
import org.apache.solr.SolrTestCaseJ4;
diff --git
a/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
b/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
index 1a65a8b9aea..353fdcf598a 100644
---
a/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
+++
b/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
@@ -457,6 +457,51 @@
http://_host:port_/api/cores/_core_name_/replication/files?generation=<_generati
+
You can discover the generation number of the index by running the
`indexversion` command.
+`filecontent`::
+Retrieve a stream of a specific file path of a core.
+
++
+====
+[.tab-label]*V1 API*
+
+[source,bash]
+----
+http://_host:port_/solr/_core_name_/replication?command=filecontent&<_directory-type_>=<_file-path_>&wt=filestream
+
+----
+====
++
+====
+[.tab-label]*V2 API*
+
+[source,bash]
+----
+http://_host:port_/api/cores/_core_name_/replication/files/<_file-path_>&dirType=<_directory-type_>
+
+----
+====
++
+Directory type is required for both V1 and V2 with these supported types:
+
+* `file` Read from Lucene index file directory
+
+* `cf` Read from Configuration file directory
+
+* `tlogFile` Read from Tlog file directory
+
++
+There are several optional parameters:
+
+* `offset` Output stream read offset
+
+* `compression` True/False compress file output
+
+* `checksum` True/False write checksum with output stream
+
+* `maxWriteMBPerSec` Limit data write per seconds. Defaults to no throttling
+
+* `generation` The generation number of the index
+
`backup`::
Create a backup on leader if there are committed index data in the server;
otherwise, does nothing.
+