This is an automated email from the ASF dual-hosted git repository.

gerlowskija pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 0cf4183c48c SOLR-16470: Create v2 replication "fetch file" API (#2734)
0cf4183c48c is described below

commit 0cf4183c48c6e2118e09e733cd2ed8a3817becd6
Author: Matthew Biscocho <[email protected]>
AuthorDate: Fri Nov 22 09:19:39 2024 -0500

    SOLR-16470: Create v2 replication "fetch file" API (#2734)
    
    New v2 API is available at `GET 
/api/cores/coreName/replication/files/fileName`
    
    ---------
    
    Co-authored-by: Matthew Biscocho <[email protected]>
    Co-authored-by: Jason Gerlowski <[email protected]>
---
 solr/CHANGES.txt                                   |   3 +
 .../solr/client/api/endpoint/ReplicationApis.java  |  46 +++
 .../org/apache/solr/cloud/ReplicateFromLeader.java |   3 +-
 .../src/java/org/apache/solr/core/SolrCore.java    |   8 +-
 .../org/apache/solr/filestore/NodeFileStore.java   |   2 +-
 .../java/org/apache/solr/handler/BlobHandler.java  |   5 +-
 .../org/apache/solr/handler/ExportHandler.java     |   5 +-
 .../java/org/apache/solr/handler/IndexFetcher.java |  32 +-
 .../apache/solr/handler/ReplicationHandler.java    | 382 ++++-----------------
 .../solr/handler/admin/HealthCheckHandler.java     |   2 +-
 .../solr/handler/admin/api/CoreReplication.java    |  20 ++
 .../solr/handler/admin/api/ReplicationAPIBase.java | 370 ++++++++++++++++++++
 .../org/apache/solr/handler/api/V2ApiUtils.java    |   2 +-
 .../solr/handler/TestReplicationHandler.java       |   7 +-
 .../TestReplicationHandlerDiskOverFlow.java        |   1 +
 .../handler/admin/api/CoreReplicationAPITest.java  |  42 ++-
 .../apache/solr/handler/api/V2ApiUtilsTest.java    |   2 +-
 .../pages/user-managed-index-replication.adoc      |  45 +++
 18 files changed, 624 insertions(+), 353 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index d6f48884ade..e9da72a86b6 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -50,6 +50,9 @@ Improvements
   New APIs for listing-all and fetching-single cluster props are also now 
available at `GET /api/cluster/properties` and
   `GET /api/cluster/properties/somePropName`, respectively. (Carlos Ugarte via 
Jason Gerlowski)
 
+* SOLR-16470: Replication "fetch file" API now has a v2 equivalent, available 
at `GET /api/cores/coreName/replication/files/fileName`
+  (Matthew Biscocho via Jason Gerlowski)
+
 
 Optimizations
 ---------------------
diff --git 
a/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java 
b/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java
index ac33894b656..3fe5ac14f45 100644
--- a/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java
+++ b/solr/api/src/java/org/apache/solr/client/api/endpoint/ReplicationApis.java
@@ -16,11 +16,18 @@
  */
 package org.apache.solr.client.api.endpoint;
 
+import static 
org.apache.solr.client.api.util.Constants.OMIT_FROM_CODEGEN_PROPERTY;
+
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.extensions.Extension;
+import io.swagger.v3.oas.annotations.extensions.ExtensionProperty;
+import jakarta.ws.rs.DefaultValue;
 import jakarta.ws.rs.GET;
 import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.QueryParam;
+import jakarta.ws.rs.core.StreamingOutput;
 import java.io.IOException;
 import org.apache.solr.client.api.model.FileListResponse;
 import org.apache.solr.client.api.model.IndexVersionResponse;
@@ -47,4 +54,43 @@ public interface ReplicationApis {
       @Parameter(description = "The generation number of the index", required 
= true)
           @QueryParam("generation")
           long gen);
+
+  @GET
+  @CoreApiParameters
+  @Operation(
+      summary = "Get a stream of a specific file path of a core",
+      tags = {"core-replication"},
+      extensions = { // TODO Remove as a part of SOLR-17562
+        @Extension(
+            properties = {@ExtensionProperty(name = 
OMIT_FROM_CODEGEN_PROPERTY, value = "true")})
+      })
+  @Path("/files/{filePath}")
+  StreamingOutput fetchFile(
+      @PathParam("filePath") String filePath,
+      @Parameter(
+              description =
+                  "Directory type for specific filePath (cf or tlogFile). 
Defaults to Lucene index (file) directory if empty",
+              required = true)
+          @QueryParam("dirType")
+          String dirType,
+      @Parameter(description = "Output stream read/write offset", required = 
false)
+          @QueryParam("offset")
+          String offset,
+      @Parameter(required = false) @QueryParam("len") String len,
+      @Parameter(description = "Compress file output", required = false)
+          @QueryParam("compression")
+          @DefaultValue("false")
+          Boolean compression,
+      @Parameter(description = "Write checksum with output stream", required = 
false)
+          @QueryParam("checksum")
+          @DefaultValue("false")
+          Boolean checksum,
+      @Parameter(
+              description = "Limit data write per seconds. Defaults to no 
throttling",
+              required = false)
+          @QueryParam("maxWriteMBPerSec")
+          double maxWriteMBPerSec,
+      @Parameter(description = "The generation number of the index", required 
= false)
+          @QueryParam("generation")
+          Long gen);
 }
diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java 
b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
index 7abe46fe0c3..4d4e8fd3246 100644
--- a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
+++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java
@@ -28,6 +28,7 @@ import org.apache.solr.core.SolrConfig;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.IndexFetcher;
 import org.apache.solr.handler.ReplicationHandler;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.update.CommitUpdateCommand;
@@ -112,7 +113,7 @@ public class ReplicateFromLeader {
       followerConfig.add(
           ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, 
skipCommitOnLeaderVersionZero);
 
-      followerConfig.add("pollInterval", pollIntervalStr);
+      followerConfig.add(ReplicationAPIBase.POLL_INTERVAL, pollIntervalStr);
       NamedList<Object> replicationConfig = new NamedList<>();
       replicationConfig.add("follower", followerConfig);
 
diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java 
b/solr/core/src/java/org/apache/solr/core/SolrCore.java
index 205f5735c74..7e0c636dcbc 100644
--- a/solr/core/src/java/org/apache/solr/core/SolrCore.java
+++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java
@@ -107,9 +107,9 @@ import org.apache.solr.core.snapshots.SolrSnapshotManager;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import 
org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager.SnapshotMetaData;
 import org.apache.solr.handler.IndexFetcher;
-import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.handler.RequestHandlerBase;
 import org.apache.solr.handler.SolrConfigHandler;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
 import org.apache.solr.handler.api.V2ApiUtils;
 import org.apache.solr.handler.component.HighlightComponent;
 import org.apache.solr.handler.component.SearchComponent;
@@ -3027,7 +3027,7 @@ public class SolrCore implements SolrInfoBean, Closeable {
     m.put("schema.xml", new SchemaXmlResponseWriter());
     m.put("smile", new SmileResponseWriter());
     m.put(PROMETHEUS_METRICS_WT, new PrometheusResponseWriter());
-    m.put(ReplicationHandler.FILE_STREAM, getFileStreamWriter());
+    m.put(ReplicationAPIBase.FILE_STREAM, getFileStreamWriter());
     DEFAULT_RESPONSE_WRITERS = Collections.unmodifiableMap(m);
     try {
       m.put(
@@ -3046,7 +3046,7 @@ public class SolrCore implements SolrInfoBean, Closeable {
       @Override
       public void write(OutputStream out, SolrQueryRequest req, 
SolrQueryResponse response)
           throws IOException {
-        RawWriter rawWriter = (RawWriter) 
response.getValues().get(ReplicationHandler.FILE_STREAM);
+        RawWriter rawWriter = (RawWriter) 
response.getValues().get(ReplicationAPIBase.FILE_STREAM);
         if (rawWriter != null) {
           rawWriter.write(out);
           if (rawWriter instanceof Closeable) ((Closeable) rawWriter).close();
@@ -3055,7 +3055,7 @@ public class SolrCore implements SolrInfoBean, Closeable {
 
       @Override
       public String getContentType(SolrQueryRequest request, SolrQueryResponse 
response) {
-        RawWriter rawWriter = (RawWriter) 
response.getValues().get(ReplicationHandler.FILE_STREAM);
+        RawWriter rawWriter = (RawWriter) 
response.getValues().get(ReplicationAPIBase.FILE_STREAM);
         if (rawWriter != null) {
           return rawWriter.getContentType();
         } else {
diff --git a/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java 
b/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java
index 2b12274ad26..e6c61419437 100644
--- a/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java
+++ b/solr/core/src/java/org/apache/solr/filestore/NodeFileStore.java
@@ -17,7 +17,7 @@
 package org.apache.solr.filestore;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
 import static org.apache.solr.response.RawResponseWriter.CONTENT;
 import static 
org.apache.solr.security.PermissionNameProvider.Name.FILESTORE_READ_PERM;
 
diff --git a/solr/core/src/java/org/apache/solr/handler/BlobHandler.java 
b/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
index ff59b99f624..1db714553a4 100644
--- a/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/BlobHandler.java
@@ -56,6 +56,7 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.core.PluginInfo;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.handler.admin.api.GetBlobInfoAPI;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
 import org.apache.solr.handler.admin.api.UploadBlobAPI;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
@@ -192,7 +193,7 @@ public class BlobHandler extends RequestHandlerBase
           return;
         }
       }
-      if 
(ReplicationHandler.FILE_STREAM.equals(req.getParams().get(CommonParams.WT))) {
+      if 
(ReplicationAPIBase.FILE_STREAM.equals(req.getParams().get(CommonParams.WT))) {
         if (blobName == null) {
           throw new SolrException(
               SolrException.ErrorCode.NOT_FOUND,
@@ -209,7 +210,7 @@ public class BlobHandler extends RequestHandlerBase
                       new Sort(new SortField("version", SortField.Type.LONG, 
true)));
           if (docs.totalHits.value > 0) {
             rsp.add(
-                ReplicationHandler.FILE_STREAM,
+                ReplicationAPIBase.FILE_STREAM,
                 new SolrCore.RawWriter() {
 
                   @Override
diff --git a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java 
b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
index 85a99dfaea9..8fa1ba64e38 100644
--- a/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ExportHandler.java
@@ -31,6 +31,7 @@ import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.core.CoreContainer;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
 import org.apache.solr.handler.component.SearchHandler;
 import org.apache.solr.handler.export.ExportWriter;
 import org.apache.solr.handler.export.ExportWriterStream;
@@ -125,10 +126,10 @@ public class ExportHandler extends SearchHandler {
     }
     String wt = req.getParams().get(CommonParams.WT, JSON);
     if ("xsort".equals(wt)) wt = JSON;
-    Map<String, String> map = Map.of(CommonParams.WT, 
ReplicationHandler.FILE_STREAM);
+    Map<String, String> map = Map.of(CommonParams.WT, 
ReplicationAPIBase.FILE_STREAM);
     req.setParams(SolrParams.wrapDefaults(new MapSolrParams(map), 
req.getParams()));
     rsp.add(
-        ReplicationHandler.FILE_STREAM,
+        ReplicationAPIBase.FILE_STREAM,
         new ExportWriter(
             req, rsp, wt, initialStreamContext, solrMetricsContext, 
writerMetricsPath));
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java 
b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
index 0f67ea810e5..0a017908967 100644
--- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
+++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
@@ -19,27 +19,25 @@ package org.apache.solr.handler;
 import static org.apache.solr.common.params.CommonParams.JAVABIN;
 import static org.apache.solr.common.params.CommonParams.NAME;
 import static org.apache.solr.handler.ReplicationHandler.ALIAS;
-import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
 import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
 import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
 import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
 import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
 import static org.apache.solr.handler.ReplicationHandler.COMMAND;
-import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
 import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
-import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
-import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
 import static org.apache.solr.handler.ReplicationHandler.FETCH_FROM_LEADER;
-import static org.apache.solr.handler.ReplicationHandler.FILE;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
-import static org.apache.solr.handler.ReplicationHandler.GENERATION;
-import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
 import static org.apache.solr.handler.ReplicationHandler.LEADER_URL;
 import static org.apache.solr.handler.ReplicationHandler.LEGACY_LEADER_URL;
 import static 
org.apache.solr.handler.ReplicationHandler.LEGACY_SKIP_COMMIT_ON_LEADER_VERSION_ZERO;
-import static org.apache.solr.handler.ReplicationHandler.OFFSET;
 import static org.apache.solr.handler.ReplicationHandler.SIZE;
 import static 
org.apache.solr.handler.ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.CHECKSUM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.COMPRESSION;
+import static 
org.apache.solr.handler.admin.api.ReplicationAPIBase.CONF_FILE_SHORT;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.GENERATION;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.OFFSET;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -122,7 +120,7 @@ import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.DirectoryFactory.DirContext;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
-import org.apache.solr.handler.ReplicationHandler.FileInfo;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
 import org.apache.solr.request.LocalSolrQueryRequest;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.search.SolrIndexSearcher;
@@ -304,8 +302,8 @@ public class IndexFetcher {
 
     this.replicationHandler = handler;
     String compress = (String) initArgs.get(COMPRESSION);
-    useInternalCompression = INTERNAL.equals(compress);
-    useExternalCompression = EXTERNAL.equals(compress);
+    useInternalCompression = ReplicationHandler.INTERNAL.equals(compress);
+    useExternalCompression = ReplicationHandler.EXTERNAL.equals(compress);
     connTimeout = getParameter(initArgs, 
HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000, null);
 
     // allow a leader override for tests - you specify this in /replication 
follower section of
@@ -1576,7 +1574,7 @@ public class IndexFetcher {
     return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(d);
   }
 
-  private final Map<String, FileInfo> confFileInfoCache = new HashMap<>();
+  private final Map<String, ReplicationHandler.FileInfo> confFileInfoCache = 
new HashMap<>();
 
   /**
    * The local conf files are compared with the conf files in the leader. If 
they are same (by
@@ -1724,7 +1722,7 @@ public class IndexFetcher {
    * The class acts as a client for ReplicationHandler.FileStream. It 
understands the protocol of
    * wt=filestream
    *
-   * @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
+   * <p>see 
org.apache.solr.handler.admin.api.ReplicationAPIBase.DirectoryFileStream
    */
   private class FileFetcher {
     private final FileInterface file;
@@ -1751,7 +1749,7 @@ public class IndexFetcher {
       this.file = file;
       this.fileName = (String) fileDetails.get(NAME);
       this.size = (Long) fileDetails.get(SIZE);
-      buf = new byte[(int) Math.min(this.size, ReplicationHandler.PACKET_SZ)];
+      buf = new byte[(int) Math.min(this.size, ReplicationAPIBase.PACKET_SZ)];
       this.solrParamOutput = solrParamOutput;
       this.saveAs = saveAs;
       indexGen = latestGen;
@@ -2050,7 +2048,7 @@ public class IndexFetcher {
     }
   }
 
-  private class DirectoryFileFetcher extends FileFetcher {
+  protected class DirectoryFileFetcher extends FileFetcher {
     DirectoryFileFetcher(
         Directory tmpIndexDir,
         Map<String, Object> fileDetails,
@@ -2110,7 +2108,7 @@ public class IndexFetcher {
     }
   }
 
-  private class LocalFsFileFetcher extends FileFetcher {
+  protected class LocalFsFileFetcher extends FileFetcher {
     LocalFsFileFetcher(
         File dir,
         Map<String, Object> fileDetails,
diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java 
b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
index 1f43cc3257f..d16f1ef42b0 100644
--- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java
@@ -17,16 +17,25 @@
 package org.apache.solr.handler;
 
 import static org.apache.solr.common.params.CommonParams.NAME;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.CHECKSUM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.COMPRESSION;
+import static 
org.apache.solr.handler.admin.api.ReplicationAPIBase.CONF_FILE_SHORT;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.GENERATION;
+import static 
org.apache.solr.handler.admin.api.ReplicationAPIBase.INTERVAL_ERR_MSG;
+import static 
org.apache.solr.handler.admin.api.ReplicationAPIBase.INTERVAL_PATTERN;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.LEN;
+import static 
org.apache.solr.handler.admin.api.ReplicationAPIBase.MAX_WRITE_PER_SECOND;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.OFFSET;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.STATUS;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.TLOG_FILE;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
-import java.nio.ByteBuffer;
-import java.nio.channels.SeekableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
@@ -52,11 +61,8 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import java.util.zip.Adler32;
 import java.util.zip.Checksum;
-import java.util.zip.DeflaterOutputStream;
-import org.apache.commons.io.output.CloseShieldOutputStream;
 import org.apache.lucene.index.DirectoryReader;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.IndexDeletionPolicy;
@@ -64,7 +70,6 @@ import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
-import org.apache.lucene.store.RateLimiter;
 import org.apache.solr.api.JerseyResource;
 import org.apache.solr.client.api.model.FileMetaData;
 import org.apache.solr.client.api.model.IndexVersionResponse;
@@ -76,7 +81,6 @@ import org.apache.solr.common.params.CoreAdminParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.params.SolrParams;
 import org.apache.solr.common.util.ExecutorUtil;
-import org.apache.solr.common.util.FastOutputStream;
 import org.apache.solr.common.util.NamedList;
 import org.apache.solr.common.util.SimpleOrderedMap;
 import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -94,6 +98,7 @@ import 
org.apache.solr.core.backup.repository.LocalFileSystemRepository;
 import org.apache.solr.handler.IndexFetcher.IndexFetchResult;
 import org.apache.solr.handler.ReplicationHandler.ReplicationHandlerConfig;
 import org.apache.solr.handler.admin.api.CoreReplication;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
 import org.apache.solr.handler.admin.api.SnapshotBackupAPI;
 import org.apache.solr.handler.api.V2ApiUtils;
 import org.apache.solr.jersey.APIConfigProvider;
@@ -273,7 +278,7 @@ public class ReplicationHandler extends RequestHandlerBase
       final SolrJerseyResponse indexVersionResponse = 
getIndexVersionResponse();
       V2ApiUtils.squashIntoSolrResponseWithoutHeader(rsp, 
indexVersionResponse);
     } else if (command.equals(CMD_GET_FILE)) {
-      getFileStream(solrParams, rsp);
+      getFileStream(solrParams, rsp, req);
     } else if (command.equals(CMD_GET_FILE_LIST)) {
       final CoreReplication coreReplicationAPI = new CoreReplication(core, 
req, rsp);
       V2ApiUtils.squashIntoSolrResponseWithoutHeader(
@@ -313,6 +318,55 @@ public class ReplicationHandler extends RequestHandlerBase
     }
   }
 
+  /**
+   * This method adds an Object of FileStream to the response . The FileStream 
implements a custom
+   * protocol which is understood by IndexFetcher.FileFetcher
+   *
+   * @see IndexFetcher.LocalFsFileFetcher
+   * @see IndexFetcher.DirectoryFileFetcher
+   */
+  private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp, 
SolrQueryRequest req)
+      throws IOException {
+    final CoreReplication coreReplicationAPI = new CoreReplication(core, req, 
rsp);
+    String fileName;
+    String dirType;
+
+    if (solrParams.get(CONF_FILE_SHORT) != null) {
+      fileName = solrParams.get(CONF_FILE_SHORT);
+      dirType = CONF_FILE_SHORT;
+    } else if (solrParams.get(TLOG_FILE) != null) {
+      fileName = solrParams.get(TLOG_FILE);
+      dirType = TLOG_FILE;
+    } else if (solrParams.get(FILE) != null) {
+      fileName = solrParams.get(FILE);
+      dirType = FILE;
+    } else {
+      reportErrorOnResponse(
+          rsp,
+          "Missing file parameter",
+          new SolrException(SolrException.ErrorCode.BAD_REQUEST, "File not 
specified in request"));
+      return;
+    }
+
+    if (solrParams.getParams(CommonParams.WT) == null) {
+      reportErrorOnResponse(
+          rsp,
+          "Missing wt parameter",
+          new SolrException(SolrException.ErrorCode.BAD_REQUEST, "wt not 
specified in request"));
+      return;
+    }
+
+    coreReplicationAPI.fetchFile(
+        fileName,
+        dirType,
+        solrParams.get(OFFSET),
+        solrParams.get(LEN),
+        Boolean.parseBoolean(solrParams.get(COMPRESSION)),
+        solrParams.getBool(CHECKSUM, false),
+        solrParams.getDouble(MAX_WRITE_PER_SECOND, Double.MAX_VALUE),
+        solrParams.getLong(GENERATION));
+  }
+
   static boolean getBoolWithBackwardCompatibility(
       SolrParams params, String preferredKey, String alternativeKey, boolean 
defaultValue) {
     Boolean value = params.getBool(preferredKey);
@@ -674,29 +728,6 @@ public class ReplicationHandler extends RequestHandlerBase
     snapShooter.createSnapAsync(numberToKeep, result);
   }
 
-  /**
-   * This method adds an Object of FileStream to the response . The FileStream 
implements a custom
-   * protocol which is understood by IndexFetcher.FileFetcher
-   *
-   * @see IndexFetcher.LocalFsFileFetcher
-   * @see IndexFetcher.DirectoryFileFetcher
-   */
-  private void getFileStream(SolrParams solrParams, SolrQueryResponse rsp) {
-    ModifiableSolrParams rawParams = new ModifiableSolrParams(solrParams);
-    rawParams.set(CommonParams.WT, FILE_STREAM);
-
-    String cfileName = solrParams.get(CONF_FILE_SHORT);
-    String tlogFileName = solrParams.get(TLOG_FILE);
-    if (cfileName != null) {
-      rsp.add(FILE_STREAM, new LocalFsConfFileStream(solrParams));
-    } else if (tlogFileName != null) {
-      rsp.add(FILE_STREAM, new LocalFsTlogFileStream(solrParams));
-    } else {
-      rsp.add(FILE_STREAM, new DirectoryFileStream(solrParams));
-    }
-    rsp.add(STATUS, OK_STATUS);
-  }
-
   public IndexVersionResponse getIndexVersionResponse() throws IOException {
 
     IndexCommit commitPoint = indexCommitPoint; // make a copy so it won't 
change
@@ -908,7 +939,7 @@ public class ReplicationHandler extends RequestHandlerBase
               if (fetcher != null) {
                 map.put(LEADER_URL, fetcher.getLeaderCoreUrl());
                 if (getPollInterval() != null) {
-                  map.put(POLL_INTERVAL, getPollInterval());
+                  map.put(ReplicationAPIBase.POLL_INTERVAL, getPollInterval());
                 }
                 map.put("isPollingDisabled", isPollingDisabled());
                 map.put("isReplicating", isReplicating());
@@ -990,7 +1021,7 @@ public class ReplicationHandler extends RequestHandlerBase
       }
       follower.add(LEADER_URL, fetcher.getLeaderCoreUrl());
       if (getPollInterval() != null) {
-        follower.add(POLL_INTERVAL, getPollInterval());
+        follower.add(ReplicationAPIBase.POLL_INTERVAL, getPollInterval());
       }
       Date nextScheduled = getNextScheduledExecTime();
       if (nextScheduled != null && !isPollingDisabled()) {
@@ -1263,7 +1294,7 @@ public class ReplicationHandler extends RequestHandlerBase
     boolean enableFollower = isEnabled(follower);
     if (enableFollower) {
       currentIndexFetcher = pollingIndexFetcher = new IndexFetcher(follower, 
this, core);
-      setupPolling((String) follower.get(POLL_INTERVAL));
+      setupPolling((String) follower.get(ReplicationAPIBase.POLL_INTERVAL));
       isFollower = true;
     }
     NamedList<?> leader = getObjectWithBackwardCompatibility(initArgs, 
"leader", "master");
@@ -1513,257 +1544,11 @@ public class ReplicationHandler extends 
RequestHandlerBase
     };
   }
 
-  /** This class is used to read and send files in the lucene index */
-  private class DirectoryFileStream implements SolrCore.RawWriter {
-    protected SolrParams params;
-
-    protected FastOutputStream fos;
-
-    protected Long indexGen;
-    protected IndexDeletionPolicyWrapper delPolicy;
-
-    protected String fileName;
-    protected String cfileName;
-    protected String tlogFileName;
-    protected String sOffset;
-    protected String sLen;
-    protected final boolean compress;
-    protected boolean useChecksum;
-
-    protected long offset = -1;
-    protected int len = -1;
-
-    protected Checksum checksum;
-
-    private RateLimiter rateLimiter;
-
-    byte[] buf;
-
-    public DirectoryFileStream(SolrParams solrParams) {
-      params = solrParams;
-      delPolicy = core.getDeletionPolicy();
-
-      fileName = validateFilenameOrError(params.get(FILE));
-      cfileName = validateFilenameOrError(params.get(CONF_FILE_SHORT));
-      tlogFileName = validateFilenameOrError(params.get(TLOG_FILE));
-
-      sOffset = params.get(OFFSET);
-      sLen = params.get(LEN);
-      compress = Boolean.parseBoolean(params.get(COMPRESSION));
-      useChecksum = params.getBool(CHECKSUM, false);
-      indexGen = params.getLong(GENERATION);
-      if (useChecksum) {
-        checksum = new Adler32();
-      }
-      // No throttle if MAX_WRITE_PER_SECOND is not specified
-      double maxWriteMBPerSec = params.getDouble(MAX_WRITE_PER_SECOND, 
Double.MAX_VALUE);
-      rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
-    }
-
-    // Throw exception on directory traversal attempts
-    protected String validateFilenameOrError(String fileName) {
-      if (fileName != null) {
-        Path filePath = Paths.get(fileName);
-        filePath.forEach(
-            subpath -> {
-              if ("..".equals(subpath.toString())) {
-                throw new SolrException(ErrorCode.FORBIDDEN, "File name cannot 
contain ..");
-              }
-            });
-        if (filePath.isAbsolute()) {
-          throw new SolrException(ErrorCode.FORBIDDEN, "File name must be 
relative");
-        }
-        return fileName;
-      } else return null;
-    }
-
-    protected void initWrite() throws IOException {
-      if (sOffset != null) offset = Long.parseLong(sOffset);
-      if (sLen != null) len = Integer.parseInt(sLen);
-      if (fileName == null && cfileName == null && tlogFileName == null) {
-        // no filename do nothing
-        writeNothingAndFlush();
-      }
-      buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
-
-      // reserve commit point till write is complete
-      if (indexGen != null) {
-        delPolicy.saveCommitPoint(indexGen);
-      }
-    }
-
-    protected void createOutputStream(OutputStream out) {
-      // DeflaterOutputStream requires a close call, but don't close the 
request outputstream
-      out = new CloseShieldOutputStream(out);
-      if (compress) {
-        fos = new FastOutputStream(new DeflaterOutputStream(out));
-      } else {
-        fos = new FastOutputStream(out);
-      }
-    }
-
-    protected void extendReserveAndReleaseCommitPoint() {
-      if (indexGen != null) {
-        // Reserve the commit point for another 10s for the next file to be to 
fetched.
-        // We need to keep extending the commit reservation between requests 
so that the replica can
-        // fetch all the files correctly.
-        delPolicy.setReserveDuration(indexGen, reserveCommitDuration);
-
-        // release the commit point as the write is complete
-        delPolicy.releaseCommitPoint(indexGen);
-      }
-    }
-
-    @Override
-    public void write(OutputStream out) throws IOException {
-      createOutputStream(out);
-
-      IndexInput in = null;
-      try {
-        initWrite();
-
-        Directory dir = core.withSearcher(searcher -> 
searcher.getIndexReader().directory());
-        in = dir.openInput(fileName, IOContext.READONCE);
-        // if offset is mentioned move the pointer to that point
-        if (offset != -1) in.seek(offset);
-
-        long filelen = dir.fileLength(fileName);
-        long maxBytesBeforePause = 0;
-
-        while (true) {
-          offset = offset == -1 ? 0 : offset;
-          int read = (int) Math.min(buf.length, filelen - offset);
-          in.readBytes(buf, 0, read);
-
-          fos.writeInt(read);
-          if (useChecksum) {
-            checksum.reset();
-            checksum.update(buf, 0, read);
-            fos.writeLong(checksum.getValue());
-          }
-          fos.write(buf, 0, read);
-          fos.flush();
-          log.debug("Wrote {} bytes for file {}", offset + read, fileName); // 
nowarn
-
-          // Pause if necessary
-          maxBytesBeforePause += read;
-          if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
-            rateLimiter.pause(maxBytesBeforePause);
-            maxBytesBeforePause = 0;
-          }
-          if (read != buf.length) {
-            writeNothingAndFlush();
-            // we close because DeflaterOutputStream requires a close call, 
but  the request
-            // outputstream is protected
-            fos.close();
-            break;
-          }
-          offset += read;
-          in.seek(offset);
-        }
-      } catch (IOException e) {
-        log.warn("Exception while writing response for params: {}", params, e);
-      } finally {
-        if (in != null) {
-          in.close();
-        }
-        extendReserveAndReleaseCommitPoint();
-      }
-    }
-
-    /** Used to write a marker for EOF */
-    protected void writeNothingAndFlush() throws IOException {
-      fos.writeInt(0);
-      fos.flush();
-    }
-  }
-
-  /** This is used to write files in the conf directory. */
-  private abstract class LocalFsFileStream extends DirectoryFileStream {
-
-    private Path file;
-
-    public LocalFsFileStream(SolrParams solrParams) {
-      super(solrParams);
-      this.file = this.initFile();
-    }
-
-    protected abstract Path initFile();
-
-    @Override
-    public void write(OutputStream out) throws IOException {
-      createOutputStream(out);
-      try {
-        initWrite();
-
-        if (Files.isReadable(file)) {
-          try (SeekableByteChannel channel = Files.newByteChannel(file)) {
-            // if offset is mentioned move the pointer to that point
-            if (offset != -1) channel.position(offset);
-            ByteBuffer bb = ByteBuffer.wrap(buf);
-
-            while (true) {
-              bb.clear();
-              long bytesRead = channel.read(bb);
-              if (bytesRead <= 0) {
-                writeNothingAndFlush();
-                // we close because DeflaterOutputStream requires a close 
call, but the request
-                // outputstream is protected
-                fos.close();
-                break;
-              }
-              fos.writeInt((int) bytesRead);
-              if (useChecksum) {
-                checksum.reset();
-                checksum.update(buf, 0, (int) bytesRead);
-                fos.writeLong(checksum.getValue());
-              }
-              fos.write(buf, 0, (int) bytesRead);
-              fos.flush();
-            }
-          }
-        } else {
-          writeNothingAndFlush();
-        }
-      } catch (IOException e) {
-        log.warn("Exception while writing response for params: {}", params, e);
-      } finally {
-        extendReserveAndReleaseCommitPoint();
-      }
-    }
-  }
-
-  private class LocalFsTlogFileStream extends LocalFsFileStream {
-
-    public LocalFsTlogFileStream(SolrParams solrParams) {
-      super(solrParams);
-    }
-
-    @Override
-    protected Path initFile() {
-      // if it is a tlog file read from tlog directory
-      return Path.of(core.getUpdateHandler().getUpdateLog().getTlogDir(), 
tlogFileName);
-    }
-  }
-
-  private class LocalFsConfFileStream extends LocalFsFileStream {
-
-    public LocalFsConfFileStream(SolrParams solrParams) {
-      super(solrParams);
-    }
-
-    @Override
-    protected Path initFile() {
-      // if it is a conf file read from config directory
-      return core.getResourceLoader().getConfigPath().resolve(cfileName);
-    }
-  }
-
-  private static Long readIntervalMs(String interval) {
+  private Long readIntervalMs(String interval) {
     return TimeUnit.MILLISECONDS.convert(readIntervalNs(interval), 
TimeUnit.NANOSECONDS);
   }
 
-  private static Long readIntervalNs(String interval) {
+  private Long readIntervalNs(String interval) {
     if (interval == null) return null;
     int result = 0;
     Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
@@ -1813,8 +1598,6 @@ public class ReplicationHandler extends RequestHandlerBase
   public static final String LEGACY_SKIP_COMMIT_ON_LEADER_VERSION_ZERO =
       "skipCommitOnMasterVersionZero";
 
-  public static final String STATUS = "status";
-
   public static final String MESSAGE = "message";
 
   public static final String COMMAND = "command";
@@ -1849,24 +1632,8 @@ public class ReplicationHandler extends 
RequestHandlerBase
 
   public static final String CMD_DELETE_BACKUP = "deletebackup";
 
-  public static final String GENERATION = "generation";
-
-  public static final String OFFSET = "offset";
-
-  public static final String LEN = "len";
-
-  public static final String FILE = "file";
-
   public static final String SIZE = "size";
 
-  public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
-
-  public static final String CONF_FILE_SHORT = "cf";
-
-  public static final String TLOG_FILE = "tlogFile";
-
-  public static final String CHECKSUM = "checksum";
-
   public static final String ALIAS = "alias";
 
   public static final String CONF_CHECKSUM = "confchecksum";
@@ -1875,21 +1642,8 @@ public class ReplicationHandler extends 
RequestHandlerBase
 
   public static final String REPLICATE_AFTER = "replicateAfter";
 
-  public static final String FILE_STREAM = "filestream";
-
-  public static final String POLL_INTERVAL = "pollInterval";
-
-  public static final String INTERVAL_ERR_MSG =
-      "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
-
-  private static final Pattern INTERVAL_PATTERN = 
Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
-
-  public static final int PACKET_SZ = 1024 * 1024; // 1MB
-
   public static final String RESERVE = "commitReserveDuration";
 
-  public static final String COMPRESSION = "compression";
-
   public static final String EXTERNAL = "external";
 
   public static final String INTERNAL = "internal";
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java 
b/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java
index a3724aa626c..897d9921e2c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/HealthCheckHandler.java
@@ -20,7 +20,7 @@ package org.apache.solr.handler.admin;
 import static org.apache.solr.common.params.CommonParams.FAILURE;
 import static org.apache.solr.common.params.CommonParams.OK;
 import static org.apache.solr.common.params.CommonParams.STATUS;
-import static org.apache.solr.handler.ReplicationHandler.GENERATION;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.GENERATION;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java 
b/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java
index c4071ced8fc..1389c6e780f 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/api/CoreReplication.java
@@ -19,10 +19,12 @@ package org.apache.solr.handler.admin.api;
 import static 
org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM;
 
 import jakarta.inject.Inject;
+import jakarta.ws.rs.core.StreamingOutput;
 import java.io.IOException;
 import org.apache.solr.client.api.endpoint.ReplicationApis;
 import org.apache.solr.client.api.model.FileListResponse;
 import org.apache.solr.client.api.model.IndexVersionResponse;
+import org.apache.solr.common.SolrException;
 import org.apache.solr.core.SolrCore;
 import org.apache.solr.jersey.PermissionName;
 import org.apache.solr.request.SolrQueryRequest;
@@ -51,4 +53,22 @@ public class CoreReplication extends ReplicationAPIBase 
implements ReplicationAp
   public FileListResponse fetchFileList(long gen) {
     return doFetchFileList(gen);
   }
+
+  @Override
+  @PermissionName(CORE_READ_PERM)
+  public StreamingOutput fetchFile(
+      String filePath,
+      String dirType,
+      String offset,
+      String len,
+      Boolean compression,
+      Boolean checksum,
+      double maxWriteMBPerSec,
+      Long gen) {
+    if (dirType == null) {
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Must 
provide a dirType ");
+    }
+    return doFetchFile(
+        filePath, dirType, offset, len, compression, checksum, 
maxWriteMBPerSec, gen);
+  }
 }
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java 
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
index 0f5e9d5dcae..638312e592d 100644
--- 
a/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
+++ 
b/solr/core/src/java/org/apache/solr/handler/admin/api/ReplicationAPIBase.java
@@ -19,11 +19,24 @@ package org.apache.solr.handler.admin.api;
 import static org.apache.solr.handler.ReplicationHandler.ERR_STATUS;
 import static org.apache.solr.handler.ReplicationHandler.OK_STATUS;
 
+import jakarta.ws.rs.core.StreamingOutput;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.invoke.MethodHandles;
+import java.nio.ByteBuffer;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.regex.Pattern;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+import java.util.zip.DeflaterOutputStream;
+import org.apache.commons.io.output.CloseShieldOutputStream;
 import org.apache.lucene.codecs.CodecUtil;
 import org.apache.lucene.index.IndexCommit;
 import org.apache.lucene.index.SegmentCommitInfo;
@@ -31,10 +44,13 @@ import org.apache.lucene.index.SegmentInfos;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.RateLimiter;
 import org.apache.solr.api.JerseyResource;
 import org.apache.solr.client.api.model.FileListResponse;
 import org.apache.solr.client.api.model.FileMetaData;
 import org.apache.solr.client.api.model.IndexVersionResponse;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.util.FastOutputStream;
 import org.apache.solr.core.DirectoryFactory;
 import org.apache.solr.core.IndexDeletionPolicyWrapper;
 import org.apache.solr.core.SolrCore;
@@ -47,7 +63,24 @@ import org.slf4j.LoggerFactory;
 /** A common parent for "replication" (i.e. replication-level) APIs. */
 public abstract class ReplicationAPIBase extends JerseyResource {
 
+  public static final String CONF_FILE_SHORT = "cf";
+  public static final String TLOG_FILE = "tlogFile";
+  public static final String FILE_STREAM = "filestream";
+  public static final String STATUS = "status";
+  public static final int PACKET_SZ = 1024 * 1024; // 1MB
+  public static final String GENERATION = "generation";
+  public static final String OFFSET = "offset";
+  public static final String LEN = "len";
+  public static final String FILE = "file";
+  public static final String MAX_WRITE_PER_SECOND = "maxWriteMBPerSec";
+  public static final String CHECKSUM = "checksum";
+  public static final String COMPRESSION = "compression";
+  public static final String POLL_INTERVAL = "pollInterval";
+  public static final String INTERVAL_ERR_MSG =
+      "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
+
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final Pattern INTERVAL_PATTERN = 
Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
   protected final SolrCore solrCore;
   protected final SolrQueryRequest solrQueryRequest;
   protected final SolrQueryResponse solrQueryResponse;
@@ -71,6 +104,33 @@ public abstract class ReplicationAPIBase extends 
JerseyResource {
     return getFileList(generation, replicationHandler);
   }
 
+  protected DirectoryFileStream doFetchFile(
+      String filePath,
+      String dirType,
+      String offset,
+      String len,
+      boolean compression,
+      boolean checksum,
+      double maxWriteMBPerSec,
+      Long gen) {
+    DirectoryFileStream dfs;
+    if (Objects.equals(dirType, CONF_FILE_SHORT)) {
+      dfs =
+          new LocalFsConfFileStream(
+              filePath, dirType, offset, len, compression, checksum, 
maxWriteMBPerSec, gen);
+    } else if (Objects.equals(dirType, TLOG_FILE)) {
+      dfs =
+          new LocalFsTlogFileStream(
+              filePath, dirType, offset, len, compression, checksum, 
maxWriteMBPerSec, gen);
+    } else {
+      dfs =
+          new DirectoryFileStream(
+              filePath, dirType, offset, len, compression, checksum, 
maxWriteMBPerSec, gen);
+    }
+    solrQueryResponse.add(FILE_STREAM, dfs);
+    return dfs;
+  }
+
   protected FileListResponse getFileList(long generation, ReplicationHandler 
replicationHandler) {
     final IndexDeletionPolicyWrapper delPol = solrCore.getDeletionPolicy();
     final FileListResponse filesResponse = new FileListResponse();
@@ -186,6 +246,316 @@ public abstract class ReplicationAPIBase extends 
JerseyResource {
     return filesResponse;
   }
 
+  /** This class is used to read and send files in the lucene index */
+  protected class DirectoryFileStream implements SolrCore.RawWriter, 
StreamingOutput {
+    protected FastOutputStream fos;
+
+    protected Long indexGen;
+    protected IndexDeletionPolicyWrapper delPolicy;
+
+    protected String fileName;
+    protected String cfileName;
+    protected String tlogFileName;
+    protected String sOffset;
+    protected String sLen;
+    protected final boolean compress;
+    protected boolean useChecksum;
+
+    protected long offset = -1;
+    protected int len = -1;
+
+    protected Checksum checksum;
+
+    private RateLimiter rateLimiter;
+
+    byte[] buf;
+
+    public DirectoryFileStream(
+        String file,
+        String dirType,
+        String offset,
+        String len,
+        boolean compression,
+        boolean useChecksum,
+        double maxWriteMBPerSec,
+        Long gen) {
+      delPolicy = solrCore.getDeletionPolicy();
+
+      fileName = validateFilenameOrError(file);
+
+      switch (dirType) {
+        case CONF_FILE_SHORT:
+          cfileName = file;
+          break;
+        case TLOG_FILE:
+          tlogFileName = file;
+          break;
+        default:
+          fileName = file;
+          break;
+      }
+
+      this.sOffset = offset;
+      this.sLen = len;
+      this.compress = compression;
+      this.useChecksum = useChecksum;
+      this.indexGen = gen;
+      if (useChecksum) {
+        checksum = new Adler32();
+      }
+      // No throttle if MAX_WRITE_PER_SECOND is not specified
+      if (maxWriteMBPerSec == 0) {
+        this.rateLimiter = new RateLimiter.SimpleRateLimiter(Double.MAX_VALUE);
+      } else {
+        this.rateLimiter = new RateLimiter.SimpleRateLimiter(maxWriteMBPerSec);
+      }
+    }
+
+    // Throw exception on directory traversal attempts
+    protected String validateFilenameOrError(String fileName) {
+      if (fileName != null) {
+        Path filePath = Paths.get(fileName);
+        filePath.forEach(
+            subpath -> {
+              if ("..".equals(subpath.toString())) {
+                throw new SolrException(
+                    SolrException.ErrorCode.FORBIDDEN, "File name cannot 
contain ..");
+              }
+            });
+        if (filePath.isAbsolute()) {
+          throw new SolrException(SolrException.ErrorCode.FORBIDDEN, "File 
name must be relative");
+        }
+        return fileName;
+      } else return null;
+    }
+
+    protected void initWrite() throws IOException {
+      this.offset = (sOffset != null) ? Long.parseLong(sOffset) : -1;
+      this.len = (sLen != null) ? Integer.parseInt(sLen) : -1;
+      if (fileName == null && cfileName == null && tlogFileName == null) {
+        // no filename do nothing
+        writeNothingAndFlush();
+      }
+      buf = new byte[(len == -1 || len > PACKET_SZ) ? PACKET_SZ : len];
+
+      // reserve commit point till write is complete
+      if (indexGen != null) {
+        delPolicy.saveCommitPoint(indexGen);
+      }
+    }
+
+    protected void createOutputStream(OutputStream out) {
+      // DeflaterOutputStream requires a close call, but don't close the 
request outputstream
+      out = new CloseShieldOutputStream(out);
+      if (compress) {
+        fos = new FastOutputStream(new DeflaterOutputStream(out));
+      } else {
+        fos = new FastOutputStream(out);
+      }
+    }
+
+    protected void extendReserveAndReleaseCommitPoint() {
+      ReplicationHandler replicationHandler =
+          (ReplicationHandler) 
solrCore.getRequestHandler(ReplicationHandler.PATH);
+
+      if (indexGen != null) {
+        // Reserve the commit point for another 10s for the next file to be to 
fetched.
+        // We need to keep extending the commit reservation between requests 
so that the replica can
+        // fetch all the files correctly.
+        delPolicy.setReserveDuration(indexGen, 
replicationHandler.getReserveCommitDuration());
+
+        // release the commit point as the write is complete
+        delPolicy.releaseCommitPoint(indexGen);
+      }
+    }
+
+    @Override
+    public void write(OutputStream out) throws IOException {
+      createOutputStream(out);
+
+      IndexInput in = null;
+      try {
+        initWrite();
+
+        Directory dir = solrCore.withSearcher(searcher -> 
searcher.getIndexReader().directory());
+        in = dir.openInput(fileName, IOContext.READONCE);
+        // if offset is mentioned move the pointer to that point
+        if (offset != -1) in.seek(offset);
+
+        long filelen = dir.fileLength(fileName);
+        long maxBytesBeforePause = 0;
+
+        while (true) {
+          offset = offset == -1 ? 0 : offset;
+          int read = (int) Math.min(buf.length, filelen - offset);
+          in.readBytes(buf, 0, read);
+
+          fos.writeInt(read);
+          if (useChecksum) {
+            checksum.reset();
+            checksum.update(buf, 0, read);
+            fos.writeLong(checksum.getValue());
+          }
+          fos.write(buf, 0, read);
+          fos.flush();
+          log.debug("Wrote {} bytes for file {}", offset + read, fileName); // 
nowarn
+
+          // Pause if necessary
+          maxBytesBeforePause += read;
+          if (maxBytesBeforePause >= rateLimiter.getMinPauseCheckBytes()) {
+            rateLimiter.pause(maxBytesBeforePause);
+            maxBytesBeforePause = 0;
+          }
+          if (read != buf.length) {
+            writeNothingAndFlush();
+            // we close because DeflaterOutputStream requires a close call, 
but  the request
+            // outputstream is protected
+            fos.close();
+            break;
+          }
+          offset += read;
+          in.seek(offset);
+        }
+      } catch (IOException e) {
+        log.warn(
+            "Exception while writing response for params fileName={} 
cfileName={} tlogFileName={} offset={} len={} compression={} generation={} 
checksum={}",
+            fileName,
+            cfileName,
+            tlogFileName,
+            sOffset,
+            sLen,
+            compress,
+            indexGen,
+            useChecksum);
+      } finally {
+        if (in != null) {
+          in.close();
+        }
+        extendReserveAndReleaseCommitPoint();
+      }
+    }
+
+    /** Used to write a marker for EOF */
+    protected void writeNothingAndFlush() throws IOException {
+      fos.writeInt(0);
+      fos.flush();
+    }
+  }
+
+  /** This is used to write files in the conf directory. */
+  protected abstract class LocalFsFileStream extends DirectoryFileStream {
+
+    private Path file;
+
+    public LocalFsFileStream(
+        String file,
+        String dirType,
+        String offset,
+        String len,
+        boolean compression,
+        boolean useChecksum,
+        double maxWriteMBPerSec,
+        Long gen) {
+      super(file, dirType, offset, len, compression, useChecksum, 
maxWriteMBPerSec, gen);
+      this.file = this.initFile();
+    }
+
+    protected abstract Path initFile();
+
+    @Override
+    public void write(OutputStream out) throws IOException {
+      createOutputStream(out);
+      try {
+        initWrite();
+
+        if (Files.isReadable(file)) {
+          try (SeekableByteChannel channel = Files.newByteChannel(file)) {
+            // if offset is mentioned move the pointer to that point
+            if (offset != -1) channel.position(offset);
+            ByteBuffer bb = ByteBuffer.wrap(buf);
+
+            while (true) {
+              bb.clear();
+              long bytesRead = channel.read(bb);
+              if (bytesRead <= 0) {
+                writeNothingAndFlush();
+                // we close because DeflaterOutputStream requires a close 
call, but the request
+                // outputstream is protected
+                fos.close();
+                break;
+              }
+              fos.writeInt((int) bytesRead);
+              if (useChecksum) {
+                checksum.reset();
+                checksum.update(buf, 0, (int) bytesRead);
+                fos.writeLong(checksum.getValue());
+              }
+              fos.write(buf, 0, (int) bytesRead);
+              fos.flush();
+            }
+          }
+        } else {
+          writeNothingAndFlush();
+        }
+      } catch (IOException e) {
+        log.warn(
+            "Exception while writing response for params fileName={} 
cfileName={} tlogFileName={} offset={} len={} compression={} generation={} 
checksum={}",
+            fileName,
+            cfileName,
+            tlogFileName,
+            sOffset,
+            sLen,
+            compress,
+            indexGen,
+            useChecksum);
+      } finally {
+        extendReserveAndReleaseCommitPoint();
+      }
+    }
+  }
+
+  protected class LocalFsTlogFileStream extends LocalFsFileStream {
+
+    public LocalFsTlogFileStream(
+        String file,
+        String dirType,
+        String offset,
+        String len,
+        boolean compression,
+        boolean useChecksum,
+        double maxWriteMBPerSec,
+        Long gen) {
+      super(file, dirType, offset, len, compression, useChecksum, 
maxWriteMBPerSec, gen);
+    }
+
+    @Override
+    protected Path initFile() {
+      // if it is a tlog file read from tlog directory
+      return Path.of(solrCore.getUpdateHandler().getUpdateLog().getTlogDir(), 
tlogFileName);
+    }
+  }
+
+  protected class LocalFsConfFileStream extends LocalFsFileStream {
+
+    public LocalFsConfFileStream(
+        String file,
+        String dirType,
+        String offset,
+        String len,
+        boolean compression,
+        boolean useChecksum,
+        double maxWriteMBPerSec,
+        Long gen) {
+      super(file, dirType, offset, len, compression, useChecksum, 
maxWriteMBPerSec, gen);
+    }
+
+    @Override
+    protected Path initFile() {
+      // if it is a conf file read from config directory
+      return solrCore.getResourceLoader().getConfigPath().resolve(cfileName);
+    }
+  }
+
   private void reportErrorOnResponse(
       FileListResponse fileListResponse, String message, Exception e) {
     fileListResponse.status = ERR_STATUS;
diff --git a/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java 
b/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java
index e2a907d07d4..9a96b34afc0 100644
--- a/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java
+++ b/solr/core/src/java/org/apache/solr/handler/api/V2ApiUtils.java
@@ -19,7 +19,7 @@ package org.apache.solr.handler.api;
 
 import static 
org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2;
 import static org.apache.solr.common.params.CommonParams.WT;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
 
 import com.fasterxml.jackson.annotation.JsonAnyGetter;
 import com.fasterxml.jackson.annotation.JsonProperty;
diff --git 
a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java 
b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
index f0f01381c8c..903635be9c2 100644
--- a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
+++ b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandler.java
@@ -72,6 +72,7 @@ import org.apache.solr.core.SolrCore;
 import org.apache.solr.core.StandardDirectoryFactory;
 import org.apache.solr.core.snapshots.SolrSnapshotMetaDataManager;
 import org.apache.solr.embedded.JettySolrRunner;
+import org.apache.solr.handler.admin.api.ReplicationAPIBase;
 import org.apache.solr.security.AllowListUrlChecker;
 import org.apache.solr.util.TestInjection;
 import org.apache.solr.util.TimeOut;
@@ -1520,9 +1521,9 @@ public class TestReplicationHandler extends 
SolrTestCaseJ4 {
         Arrays.asList(absFile, "../dir/traversal", "illegal\rfile\nname\t");
     List<String> params =
         Arrays.asList(
-            ReplicationHandler.FILE,
-            ReplicationHandler.CONF_FILE_SHORT,
-            ReplicationHandler.TLOG_FILE);
+            ReplicationAPIBase.FILE,
+            ReplicationAPIBase.CONF_FILE_SHORT,
+            ReplicationAPIBase.TLOG_FILE);
     for (String param : params) {
       for (String filename : illegalFilenames) {
         expectThrows(
diff --git 
a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
 
b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
index c66a1dd7201..67b5cc47d9c 100644
--- 
a/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
+++ 
b/solr/core/src/test/org/apache/solr/handler/TestReplicationHandlerDiskOverFlow.java
@@ -214,6 +214,7 @@ public class TestReplicationHandlerDiskOverFlow extends 
SolrTestCaseJ4 {
                 .add("qt", "/replication")
                 .add("command", CMD_FETCH_INDEX)
                 .add("wait", "true"));
+
     assertEquals("Replication command status", "OK", 
response._getStr("status", null));
 
     assertEquals(
diff --git 
a/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
 
b/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
index bfde05348d5..be05cedbbc1 100644
--- 
a/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
+++ 
b/solr/core/src/test/org/apache/solr/handler/admin/api/CoreReplicationAPITest.java
@@ -21,6 +21,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import io.opentracing.noop.NoopSpan;
+import java.io.IOException;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -29,9 +31,12 @@ import org.apache.solr.client.api.model.FileListResponse;
 import org.apache.solr.client.api.model.FileMetaData;
 import org.apache.solr.client.api.model.IndexVersionResponse;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.handler.ReplicationHandler;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
+import org.apache.solr.update.UpdateHandler;
+import org.apache.solr.update.UpdateLog;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -42,8 +47,6 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
   private CoreReplication coreReplicationAPI;
   private SolrCore mockCore;
   private ReplicationHandler mockReplicationHandler;
-  private SolrQueryRequest mockQueryRequest;
-  private SolrQueryResponse queryResponse;
 
   @BeforeClass
   public static void ensureWorkingMockito() {
@@ -55,9 +58,9 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
   public void setUp() throws Exception {
     super.setUp();
     setUpMocks();
-    mockQueryRequest = mock(SolrQueryRequest.class);
+    final var mockQueryRequest = mock(SolrQueryRequest.class);
     when(mockQueryRequest.getSpan()).thenReturn(NoopSpan.INSTANCE);
-    queryResponse = new SolrQueryResponse();
+    final var queryResponse = new SolrQueryResponse();
     coreReplicationAPI = new CoreReplicationAPIMock(mockCore, 
mockQueryRequest, queryResponse);
   }
 
@@ -81,10 +84,37 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
     assertEquals(123456789, actualResponse.fileList.get(0).checksum);
   }
 
-  private void setUpMocks() {
+  @Test
+  public void testFetchFile() throws Exception {
+    ReplicationAPIBase.DirectoryFileStream actual =
+        coreReplicationAPI.doFetchFile("./test", "file", null, null, false, 
false, 0, null);
+    assertNotNull(actual);
+
+    actual =
+        coreReplicationAPI.doFetchFile("./test", "tlogFile", null, null, 
false, false, 0, null);
+    assertTrue(actual instanceof ReplicationAPIBase.LocalFsTlogFileStream);
+
+    actual = coreReplicationAPI.doFetchFile("./test", "cf", null, null, false, 
false, 0, null);
+    assertTrue(actual instanceof ReplicationAPIBase.LocalFsConfFileStream);
+  }
+
+  private void setUpMocks() throws IOException {
     mockCore = mock(SolrCore.class);
     mockReplicationHandler = mock(ReplicationHandler.class);
+
+    // Mocks for LocalFsTlogFileStream
+    UpdateHandler mockUpdateHandler = mock(UpdateHandler.class);
+    UpdateLog mockUpdateLog = mock(UpdateLog.class);
+    when(mockUpdateHandler.getUpdateLog()).thenReturn(mockUpdateLog);
+    when(mockUpdateLog.getTlogDir()).thenReturn("ignore");
+
+    // Mocks for LocalFsConfFileStream
+    SolrResourceLoader mockSolrResourceLoader = mock(SolrResourceLoader.class);
+    Path mockPath = mock(Path.class);
     
when(mockCore.getRequestHandler(ReplicationHandler.PATH)).thenReturn(mockReplicationHandler);
+    when(mockCore.getUpdateHandler()).thenReturn(mockUpdateHandler);
+    when(mockCore.getResourceLoader()).thenReturn(mockSolrResourceLoader);
+    when(mockSolrResourceLoader.getConfigPath()).thenReturn(mockPath);
   }
 
   private static class CoreReplicationAPIMock extends CoreReplication {
@@ -94,7 +124,7 @@ public class CoreReplicationAPITest extends SolrTestCaseJ4 {
 
     @Override
     protected FileListResponse getFileList(long generation, ReplicationHandler 
replicationHandler) {
-      final FileListResponse filesResponse = new FileListResponse();
+      final var filesResponse = new FileListResponse();
       List<FileMetaData> fileMetaData = Arrays.asList(new FileMetaData(123, 
"test", 123456789));
       filesResponse.fileList = new ArrayList<>(fileMetaData);
       return filesResponse;
diff --git a/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java 
b/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java
index 51b89243dbe..78f2f8d5fb9 100644
--- a/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java
+++ b/solr/core/src/test/org/apache/solr/handler/api/V2ApiUtilsTest.java
@@ -17,7 +17,7 @@
 package org.apache.solr.handler.api;
 
 import static 
org.apache.solr.client.solrj.impl.BinaryResponseParser.BINARY_CONTENT_TYPE_V2;
-import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
+import static org.apache.solr.handler.admin.api.ReplicationAPIBase.FILE_STREAM;
 
 import jakarta.ws.rs.core.MediaType;
 import org.apache.solr.SolrTestCaseJ4;
diff --git 
a/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
 
b/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
index 1a65a8b9aea..353fdcf598a 100644
--- 
a/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
+++ 
b/solr/solr-ref-guide/modules/deployment-guide/pages/user-managed-index-replication.adoc
@@ -457,6 +457,51 @@ 
http://_host:port_/api/cores/_core_name_/replication/files?generation=<_generati
 +
 You can discover the generation number of the index by running the 
`indexversion` command.
 
+`filecontent`::
+Retrieve a stream of a specific file path of a core.
+
++
+====
+[.tab-label]*V1 API*
+
+[source,bash]
+----
+http://_host:port_/solr/_core_name_/replication?command=filecontent&;<_directory-type_>=<_file-path_>&wt=filestream
+
+----
+====
++
+====
+[.tab-label]*V2 API*
+
+[source,bash]
+----
+http://_host:port_/api/cores/_core_name_/replication/files/<_file-path_>&dirType=<_directory-type_>
+
+----
+====
++
+Directory type is required for both V1 and V2 with these supported types:
+
+* `file` Read from Lucene index file directory
+
+* `cf` Read from Configuration file directory
+
+* `tlogFile` Read from Tlog file directory
+
++
+There are several optional parameters:
+
+* `offset` Output stream read offset
+
+* `compression` True/False compress file output
+
+* `checksum` True/False write checksum with output stream
+
+* `maxWriteMBPerSec` Limit data write per seconds. Defaults to no throttling
+
+* `generation` The generation number of the index
+
 `backup`::
 Create a backup on leader if there are committed index data in the server; 
otherwise, does nothing.
 +

Reply via email to