This is an automated email from the ASF dual-hosted git repository.
epugh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 4b9e5392575 SOLR-14673: Add bin/solr stream CLI (#2479)
4b9e5392575 is described below
commit 4b9e539257512801cf740d9bc142a95a99576103
Author: Eric Pugh <[email protected]>
AuthorDate: Wed Nov 13 15:32:46 2024 -0500
SOLR-14673: Add bin/solr stream CLI (#2479)
* Allows you to run a streaming expression on the Solr server, using the
/stream end point.
* Allows you to run a streaming expression locally, by specifying
--execution=local.
---
solr/CHANGES.txt | 2 +
solr/bin/solr.cmd | 4 +-
.../core/src/java/org/apache/solr/cli/SolrCLI.java | 4 +-
.../src/java/org/apache/solr/cli/StreamTool.java | 531 +++++++++++++++++++++
.../java/org/apache/solr/handler/CatStream.java | 5 +-
.../test/org/apache/solr/cli/StreamToolTest.java | 366 ++++++++++++++
solr/packaging/test/test_stream.bats | 86 ++++
.../modules/query-guide/pages/stream-tool.adoc | 176 +++++++
.../query-guide/pages/streaming-expressions.adoc | 4 +
.../modules/query-guide/querying-nav.adoc | 1 +
.../solr/client/solrj/io/SolrClientCache.java | 29 +-
.../solr/client/solrj/io/stream/LetStream.java | 5 +
12 files changed, 1197 insertions(+), 16 deletions(-)
diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index 874adfcc4d7..93e737a543d 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -12,6 +12,8 @@ New Features
* SOLR-17467: Solr CLI bin/solr start defaults to starting Solr in Cloud mode,
use --user-managed switch for User Managed (aka Standalone) mode. (Eric Pugh)
+* SOLR-14673: Solr CLI now has bin/solr stream tool that executates streaming
expressions via command line, either locally or on solr cluster. (Eric Pugh)
+
Improvements
---------------------
diff --git a/solr/bin/solr.cmd b/solr/bin/solr.cmd
index 5fd6ec44aea..9f875926517 100755
--- a/solr/bin/solr.cmd
+++ b/solr/bin/solr.cmd
@@ -1175,9 +1175,9 @@ for %%a in (%*) do (
) else (
set "option!option!=%%a"
if "!option!" equ "-s" set "SOLR_HOME=%%a"
- if "!option!" equ "--solr-home" set "SOLR_HOME=%%a"
+ if "!option!" equ "--solr-home" set "SOLR_HOME=%%a"
if "!option!" equ "-d" set "SOLR_SERVER_DIR=%%a"
- if "!option!" equ "--server-dir" set "SOLR_SERVER_DIR=%%a"
+ if "!option!" equ "--server-dir" set "SOLR_SERVER_DIR=%%a"
if not "!option!" equ "-s" if not "!option!" equ "--solr-home" if not
"!option!" equ "-d" if not "!option!" equ "--server-dir" (
set "AUTH_PARAMS=!AUTH_PARAMS! !option! %%a"
)
diff --git a/solr/core/src/java/org/apache/solr/cli/SolrCLI.java
b/solr/core/src/java/org/apache/solr/cli/SolrCLI.java
index 00a97b4434c..4714c43c99c 100755
--- a/solr/core/src/java/org/apache/solr/cli/SolrCLI.java
+++ b/solr/core/src/java/org/apache/solr/cli/SolrCLI.java
@@ -246,6 +246,7 @@ public class SolrCLI implements CLIO {
else if ("post".equals(toolType)) return new PostTool();
else if ("postlogs".equals(toolType)) return new PostLogsTool();
else if ("version".equals(toolType)) return new VersionTool();
+ else if ("stream".equals(toolType)) return new StreamTool();
else if ("snapshot-create".equals(toolType)) return new
SnapshotCreateTool();
else if ("snapshot-delete".equals(toolType)) return new
SnapshotDeleteTool();
else if ("snapshot-list".equals(toolType)) return new SnapshotListTool();
@@ -511,8 +512,7 @@ public class SolrCLI implements CLIO {
print("Usage: solr COMMAND OPTIONS");
print(" where COMMAND is one of: start, stop, restart, status, ");
print(
- " healthcheck, create, delete, auth,
assert, config, export, api, package, post, ");
-
+ " healthcheck, create, delete, auth,
assert, config, export, api, package, post, stream,");
print(
" zk ls, zk cp, zk rm , zk mv, zk
mkroot, zk upconfig, zk downconfig,");
print(
diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java
b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
new file mode 100644
index 00000000000..9c0392ec71b
--- /dev/null
+++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.cli;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.LineNumberReader;
+import java.io.PrintStream;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.solr.client.solrj.io.Lang;
+import org.apache.solr.client.solrj.io.SolrClientCache;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.io.comp.StreamComparator;
+import org.apache.solr.client.solrj.io.stream.PushBackStream;
+import org.apache.solr.client.solrj.io.stream.SolrStream;
+import org.apache.solr.client.solrj.io.stream.StreamContext;
+import org.apache.solr.client.solrj.io.stream.TupleStream;
+import org.apache.solr.client.solrj.io.stream.expr.Explanation;
+import org.apache.solr.client.solrj.io.stream.expr.Expressible;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
+import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
+import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.handler.CatStream;
+
+/** Supports stream command in the bin/solr script. */
+public class StreamTool extends ToolBase {
+
+ public StreamTool() {
+ this(CLIO.getOutStream());
+ }
+
+ public StreamTool(PrintStream stdout) {
+ super(stdout);
+ }
+
+ private final SolrClientCache solrClientCache = new SolrClientCache();
+
+ @Override
+ public String getName() {
+ return "stream";
+ }
+
+ @Override
+ public String getUsage() {
+ // Specify that the last argument is the streaming expression
+ return "bin/solr stream [--array-delimiter <CHARACTER>] [-c <NAME>]
[--delimiter <CHARACTER>] [-e <ENVIRONMENT>] [-f\n"
+ + " <FIELDS>] [-h] [--header] [-s <HOST>] [-u <credentials>]
[-v] [-z <HOST>] <streaming expression OR stream_file.expr>\n";
+ }
+
+ private static final Option EXECUTION_OPTION =
+ Option.builder("e")
+ .longOpt("execution")
+ .hasArg()
+ .argName("ENVIRONMENT")
+ .desc(
+ "Execution environment is either 'local' (i.e CLI process) or
via a 'remote' Solr server. Default environment is 'remote'.")
+ .build();
+
+ private static final Option COLLECTION_OPTION =
+ Option.builder("c")
+ .longOpt("name")
+ .argName("NAME")
+ .hasArg()
+ .desc(
+ "Name of the specific collection to execute expression on if the
execution is set to 'remote'. Required for 'remote' execution environment.")
+ .build();
+
+ private static final Option FIELDS_OPTION =
+ Option.builder("f")
+ .longOpt("fields")
+ .argName("FIELDS")
+ .hasArg()
+ .desc(
+ "The fields in the tuples to output. Defaults to fields in the
first tuple of result set.")
+ .build();
+
+ private static final Option HEADER_OPTION =
+ Option.builder().longOpt("header").desc("Specify to include a header
line.").build();
+
+ private static final Option DELIMITER_OPTION =
+ Option.builder()
+ .longOpt("delimiter")
+ .argName("CHARACTER")
+ .hasArg()
+ .desc("The output delimiter. Default to using three spaces.")
+ .build();
+ private static final Option ARRAY_DELIMITER_OPTION =
+ Option.builder()
+ .longOpt("array-delimiter")
+ .argName("CHARACTER")
+ .hasArg()
+ .desc("The delimiter multi-valued fields. Default to using a pipe
(|) delimiter.")
+ .build();
+
+ @Override
+ public Options getOptions() {
+
+ return super.getOptions()
+ .addOption(EXECUTION_OPTION)
+ .addOption(COLLECTION_OPTION)
+ .addOption(FIELDS_OPTION)
+ .addOption(HEADER_OPTION)
+ .addOption(DELIMITER_OPTION)
+ .addOption(ARRAY_DELIMITER_OPTION)
+ .addOption(CommonCLIOptions.CREDENTIALS_OPTION)
+ .addOptionGroup(getConnectionOptions());
+ }
+
+ @Override
+ @SuppressWarnings({"rawtypes"})
+ public void runImpl(CommandLine cli) throws Exception {
+
+ String expressionArgument = cli.getArgs()[0];
+ String execution = cli.getOptionValue(EXECUTION_OPTION, "remote");
+ String arrayDelimiter = cli.getOptionValue(ARRAY_DELIMITER_OPTION, "|");
+ String delimiter = cli.getOptionValue(DELIMITER_OPTION, " ");
+ boolean includeHeaders = cli.hasOption(HEADER_OPTION);
+ String[] outputHeaders = getOutputFields(cli);
+
+ LineNumberReader bufferedReader = null;
+ String expr;
+ try {
+ Reader inputStream =
+ expressionArgument.toLowerCase(Locale.ROOT).endsWith(".expr")
+ ? new InputStreamReader(
+ new FileInputStream(expressionArgument),
Charset.defaultCharset())
+ : new StringReader(expressionArgument);
+
+ bufferedReader = new LineNumberReader(inputStream);
+ expr = StreamTool.readExpression(bufferedReader, cli.getArgs());
+ echoIfVerbose("Running Expression: " + expr);
+ } finally {
+ if (bufferedReader != null) {
+ bufferedReader.close();
+ }
+ }
+
+ PushBackStream pushBackStream;
+ if (execution.equalsIgnoreCase("local")) {
+ pushBackStream = doLocalMode(cli, expr);
+ } else {
+ pushBackStream = doRemoteMode(cli, expr);
+ }
+
+ try {
+ pushBackStream.open();
+
+ if (outputHeaders == null) {
+
+ Tuple tuple = pushBackStream.read();
+
+ if (!tuple.EOF) {
+ outputHeaders = getHeadersFromFirstTuple(tuple);
+ }
+
+ pushBackStream.pushBack(tuple);
+ }
+
+ if (includeHeaders) {
+ StringBuilder headersOut = new StringBuilder();
+ if (outputHeaders != null) {
+ for (int i = 0; i < outputHeaders.length; i++) {
+ if (i > 0) {
+ headersOut.append(delimiter);
+ }
+ headersOut.append(outputHeaders[i]);
+ }
+ }
+ CLIO.out(headersOut.toString());
+ }
+
+ while (true) {
+ Tuple tuple = pushBackStream.read();
+ if (tuple.EOF) {
+ break;
+ } else {
+ StringBuilder outLine = new StringBuilder();
+ if (outputHeaders != null) {
+ for (int i = 0; i < outputHeaders.length; i++) {
+ if (i > 0) {
+ outLine.append(delimiter);
+ }
+
+ Object o = tuple.get(outputHeaders[i]);
+ if (o != null) {
+ if (o instanceof List) {
+ List outfields = (List) o;
+ outLine.append(listToString(outfields, arrayDelimiter));
+ } else {
+ outLine.append(o);
+ }
+ }
+ }
+ }
+ CLIO.out(outLine.toString());
+ }
+ }
+ } finally {
+
+ if (pushBackStream != null) {
+ pushBackStream.close();
+ }
+
+ solrClientCache.close();
+ }
+
+ echoIfVerbose("StreamTool -- Done.");
+ }
+
+ /**
+ * Runs a streaming expression in the local process of the CLI.
+ *
+ * <p>Running locally means that parallelization support or those
expressions requiring access to
+ * internal Solr capabilities will not function.
+ *
+ * @param cli The CLI invoking the call
+ * @param expr The streaming expression to be parsed and in the context of
the CLI process
+ * @return A connection to the streaming expression that receives Tuples as
they are emitted
+ * locally.
+ */
+ private PushBackStream doLocalMode(CommandLine cli, String expr) throws
Exception {
+ String zkHost = SolrCLI.getZkHost(cli);
+
+ echoIfVerbose("Connecting to ZooKeeper at " + zkHost);
+ solrClientCache.getCloudSolrClient(zkHost);
+ solrClientCache.setBasicAuthCredentials(
+ cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION));
+
+ TupleStream stream;
+ PushBackStream pushBackStream;
+
+ StreamExpression streamExpression = StreamExpressionParser.parse(expr);
+ StreamFactory streamFactory = new StreamFactory();
+
+ // stdin is ONLY available in the local mode, not in the remote mode as it
+ // requires access to System.in
+ streamFactory.withFunctionName("stdin", StandardInStream.class);
+
+ // LocalCatStream extends CatStream and disables the Solr cluster specific
+ // logic about where to read data from.
+ streamFactory.withFunctionName("cat", LocalCatStream.class);
+
+ streamFactory.withDefaultZkHost(zkHost);
+
+ Lang.register(streamFactory);
+
+ stream = StreamTool.constructStream(streamFactory, streamExpression);
+
+ pushBackStream = new PushBackStream(stream);
+
+ // Now we can run the stream and return the results.
+ StreamContext streamContext = new StreamContext();
+ streamContext.setSolrClientCache(solrClientCache);
+
+ // Output the headers
+ pushBackStream.setStreamContext(streamContext);
+
+ return pushBackStream;
+ }
+
+ /**
+ * Runs a streaming expression on a Solr collection via the /stream end
point and returns the
+ * results to the CLI. Requires a collection to be specified to send the
expression to.
+ *
+ * <p>Running remotely allows you to use all the standard Streaming
Expression capabilities as the
+ * expression is running in a Solr environment.
+ *
+ * @param cli The CLI invoking the call
+ * @param expr The streaming expression to be parsed and run remotely
+ * @return A connection to the streaming expression that receives Tuples as
they are emitted from
+ * Solr /stream.
+ */
+ private PushBackStream doRemoteMode(CommandLine cli, String expr) throws
Exception {
+
+ String solrUrl = SolrCLI.normalizeSolrUrl(cli);
+ if (!cli.hasOption("name")) {
+ throw new IllegalStateException(
+ "You must provide --name COLLECTION with --worker solr parameter.");
+ }
+ String collection = cli.getOptionValue("name");
+
+ if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) {
+ throw new IllegalStateException(
+ "The stdin() expression is only usable with --worker local set up.");
+ }
+
+ final SolrStream solrStream =
+ new SolrStream(solrUrl + "/solr/" + collection, params("qt",
"/stream", "expr", expr));
+
+ String credentials =
cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION);
+ if (credentials != null) {
+ String username = credentials.split(":")[0];
+ String password = credentials.split(":")[1];
+ solrStream.setCredentials(username, password);
+ }
+ return new PushBackStream(solrStream);
+ }
+
+ private static ModifiableSolrParams params(String... params) {
+ if (params.length % 2 != 0) throw new RuntimeException("Params length
should be even");
+ ModifiableSolrParams msp = new ModifiableSolrParams();
+ for (int i = 0; i < params.length; i += 2) {
+ msp.add(params[i], params[i + 1]);
+ }
+ return msp;
+ }
+
+ public static class StandardInStream extends TupleStream implements
Expressible {
+
+ private BufferedReader reader;
+ private InputStream inputStream = System.in;
+ private boolean doClose = false;
+
+ public StandardInStream() {}
+
+ public StandardInStream(StreamExpression expression, StreamFactory factory)
+ throws IOException {}
+
+ @Override
+ public List<TupleStream> children() {
+ return null;
+ }
+
+ public void setInputStream(InputStream inputStream) {
+ this.inputStream = inputStream;
+ this.doClose = true;
+ }
+
+ @Override
+ public void open() {
+ reader = new BufferedReader(new InputStreamReader(inputStream,
StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (doClose) {
+ inputStream.close();
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public Tuple read() throws IOException {
+ String line = reader.readLine();
+ HashMap map = new HashMap();
+ Tuple tuple = new Tuple(map);
+ if (line != null) {
+ tuple.put("line", line);
+ tuple.put("file", "cat");
+ } else {
+ tuple.put("EOF", "true");
+ }
+ return tuple;
+ }
+
+ @Override
+ public void setStreamContext(StreamContext context) {}
+
+ @Override
+ public StreamExpression toExpression(StreamFactory factory) {
+ return null;
+ }
+
+ @Override
+ public Explanation toExplanation(StreamFactory factory) {
+ return null;
+ }
+
+ @Override
+ public StreamComparator getStreamSort() {
+ return null;
+ }
+ }
+
+ static String[] getOutputFields(CommandLine cli) {
+ if (cli.hasOption(FIELDS_OPTION)) {
+
+ String fl = cli.getOptionValue(FIELDS_OPTION);
+ String[] flArray = fl.split(",");
+ String[] outputHeaders = new String[flArray.length];
+
+ for (int i = 0; i < outputHeaders.length; i++) {
+ outputHeaders[i] = flArray[i].trim();
+ }
+
+ return outputHeaders;
+
+ } else {
+ return null;
+ }
+ }
+
+ public static class LocalCatStream extends CatStream {
+
+ public LocalCatStream(StreamExpression expression, StreamFactory factory)
throws IOException {
+ super(expression, factory);
+ }
+
+ public LocalCatStream(String commaDelimitedFilepaths, int maxLines) {
+ super(commaDelimitedFilepaths, maxLines);
+ }
+
+ @Override
+ public void setStreamContext(StreamContext context) {
+ // LocalCatStream has no Solr core to pull from the context
+ }
+
+ @Override
+ protected List<CrawlFile> validateAndSetFilepathsInSandbox(String
commaDelimitedFilepaths) {
+ final List<CrawlFile> crawlSeeds = new ArrayList<>();
+ for (String crawlRootStr : commaDelimitedFilepaths.split(",")) {
+ Path crawlRootPath = Paths.get(crawlRootStr).normalize();
+
+ if (!Files.exists(crawlRootPath)) {
+ throw new SolrException(
+ SolrException.ErrorCode.BAD_REQUEST,
+ "file/directory to stream doesn't exist: " + crawlRootStr);
+ }
+
+ crawlSeeds.add(new CrawlFile(crawlRootStr, crawlRootPath));
+ }
+
+ return crawlSeeds;
+ }
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ static String[] getHeadersFromFirstTuple(Tuple tuple) {
+ Set fields = tuple.getFields().keySet();
+ String[] outputHeaders = new String[fields.size()];
+ int i = -1;
+ for (Object o : fields) {
+ outputHeaders[++i] = o.toString();
+ }
+ Arrays.sort(outputHeaders);
+ return outputHeaders;
+ }
+
+ @SuppressWarnings({"rawtypes"})
+ static String listToString(List values, String internalDelim) {
+ StringBuilder buf = new StringBuilder();
+ for (Object value : values) {
+ if (buf.length() > 0) {
+ buf.append(internalDelim);
+ }
+
+ buf.append(value.toString());
+ }
+
+ return buf.toString();
+ }
+
+ private static TupleStream constructStream(
+ StreamFactory streamFactory, StreamExpression streamExpression) throws
IOException {
+ return streamFactory.constructStream(streamExpression);
+ }
+
+ static String readExpression(LineNumberReader bufferedReader, String[] args)
throws IOException {
+
+ StringBuilder exprBuff = new StringBuilder();
+
+ boolean comment = false;
+ while (true) {
+ String line = bufferedReader.readLine();
+ if (line == null) {
+ break;
+ }
+
+ if (line.indexOf("/*") == 0) {
+ comment = true;
+ continue;
+ }
+
+ if (line.indexOf("*/") == 0) {
+ comment = false;
+ continue;
+ }
+
+ if (comment || line.startsWith("#") || line.startsWith("//")) {
+ continue;
+ }
+
+ // Substitute parameters
+
+ if (line.length() > 0) {
+ for (int i = 1; i < args.length; i++) {
+ String arg = args[i];
+ line = line.replace("$" + i, arg);
+ }
+ }
+
+ exprBuff.append(line);
+ }
+
+ return exprBuff.toString();
+ }
+}
diff --git a/solr/core/src/java/org/apache/solr/handler/CatStream.java
b/solr/core/src/java/org/apache/solr/handler/CatStream.java
index 70ee2b65242..f2515f9b38b 100644
--- a/solr/core/src/java/org/apache/solr/handler/CatStream.java
+++ b/solr/core/src/java/org/apache/solr/handler/CatStream.java
@@ -113,7 +113,8 @@ public class CatStream extends TupleStream implements
Expressible {
@Override
public void open() throws IOException {
- final List<CrawlFile> initialCrawlSeeds =
validateAndSetFilepathsInSandbox();
+ final List<CrawlFile> initialCrawlSeeds =
+ validateAndSetFilepathsInSandbox(this.commaDelimitedFilepaths);
final List<CrawlFile> filesToCrawl = new ArrayList<>();
for (CrawlFile crawlSeed : initialCrawlSeeds) {
@@ -163,7 +164,7 @@ public class CatStream extends TupleStream implements
Expressible {
.withExpression(toExpression(factory).toString());
}
- private List<CrawlFile> validateAndSetFilepathsInSandbox() {
+ protected List<CrawlFile> validateAndSetFilepathsInSandbox(String
commaDelimitedFilepaths) {
final List<CrawlFile> crawlSeeds = new ArrayList<>();
for (String crawlRootStr : commaDelimitedFilepaths.split(",")) {
Path crawlRootPath = chroot.resolve(crawlRootStr).normalize();
diff --git a/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java
b/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java
new file mode 100644
index 00000000000..e91ab9e2d81
--- /dev/null
+++ b/solr/core/src/test/org/apache/solr/cli/StreamToolTest.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.cli;
+
+import static org.apache.solr.cli.SolrCLI.findTool;
+import static org.apache.solr.cli.SolrCLI.parseCmdLine;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.LineNumberReader;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.commons.cli.CommandLine;
+import org.apache.solr.client.solrj.SolrRequest;
+import org.apache.solr.client.solrj.SolrResponse;
+import org.apache.solr.client.solrj.io.Tuple;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.util.SecurityJson;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class StreamToolTest extends SolrCloudTestCase {
+
+ @BeforeClass
+ public static void setupClusterWithSecurityEnabled() throws Exception {
+ configureCluster(2).withSecurityJson(SecurityJson.SIMPLE).configure();
+ }
+
+ private <T extends SolrRequest<? extends SolrResponse>> T withBasicAuth(T
req) {
+ req.setBasicAuthCredentials(SecurityJson.USER, SecurityJson.PASS);
+ return req;
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testGetHeaderFromFirstTuple() {
+ Tuple tuple = new Tuple(new HashMap());
+ tuple.put("field1", "blah");
+ tuple.put("field2", "blah");
+ tuple.put("field3", "blah");
+
+ String[] headers = StreamTool.getHeadersFromFirstTuple(tuple);
+
+ assertEquals(headers.length, 3);
+ assertEquals(headers[0], "field1");
+ assertEquals(headers[1], "field2");
+ assertEquals(headers[2], "field3");
+ }
+
+ @Test
+ public void testGetOutputFields() {
+ String[] args =
+ new String[] {
+ "--fields", "field9, field2, field3, field4",
+ };
+ StreamTool streamTool = new StreamTool();
+ CommandLine cli = SolrCLI.processCommandLineArgs(streamTool, args);
+ String[] outputFields = StreamTool.getOutputFields(cli);
+ assert outputFields != null;
+ assertEquals(outputFields.length, 4);
+ assertEquals(outputFields[0], "field9");
+ assertEquals(outputFields[1], "field2");
+ assertEquals(outputFields[2], "field3");
+ assertEquals(outputFields[3], "field4");
+ }
+
+ @Test
+ public void testReadExpression() throws Exception {
+ // This covers parameter substitution and expanded comments support.
+
+ String[] args = {"file.expr", "one", "two", "three"};
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter buf = new PrintWriter(stringWriter);
+ buf.println("/*");
+ buf.println("Multi-line comment Comment...");
+ buf.println("*/");
+ buf.println("// Single line comment");
+ buf.println("# Single line comment");
+ buf.println("let(a=$1, b=$2,");
+ buf.println("search($3))");
+ buf.println(")");
+
+ String expr = stringWriter.toString();
+
+ LineNumberReader reader = new LineNumberReader(new StringReader(expr));
+ String finalExpression = StreamTool.readExpression(reader, args);
+ // Strip the comment and insert the params in order.
+ assertEquals(finalExpression, "let(a=one, b=two,search(three)))");
+ }
+
+ @Test
+ public void testReadExpression2() throws Exception {
+ // This covers parameter substitution and expanded comments support.
+
+ String[] args = {"file.expr", "id", "desc_s", "desc"};
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter buf = new PrintWriter(stringWriter);
+
+ buf.println("# Try me");
+ buf.println("search(my_collection,q='*:*',fl='$1, $2',sort='id $3')");
+
+ String expr = stringWriter.toString();
+
+ LineNumberReader reader = new LineNumberReader(new StringReader(expr));
+ String finalExpression = StreamTool.readExpression(reader, args);
+ // Strip the comment and insert the params in order.
+ assertEquals(finalExpression, "search(my_collection,q='*:*',fl='id,
desc_s',sort='id desc')");
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testReadStream() throws Exception {
+ StreamTool.StandardInStream inStream = new StreamTool.StandardInStream();
+ List<Tuple> tuples = new ArrayList();
+ try {
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter buf = new PrintWriter(stringWriter);
+
+ buf.println("one two");
+ buf.println("three four");
+ buf.println("five six");
+
+ String expr = stringWriter.toString();
+ ByteArrayInputStream inputStream =
+ new ByteArrayInputStream(expr.getBytes(Charset.defaultCharset()));
+ inStream.setInputStream(inputStream);
+ inStream.open();
+ while (true) {
+ Tuple tuple = inStream.read();
+ if (tuple.EOF) {
+ break;
+ } else {
+ tuples.add(tuple);
+ }
+ }
+
+ } finally {
+ inStream.close();
+ }
+
+ assertEquals(tuples.size(), 3);
+
+ String line1 = tuples.get(0).getString("line");
+ String line2 = tuples.get(1).getString("line");
+ String line3 = tuples.get(2).getString("line");
+
+ assertEquals("one two", line1);
+ assertEquals("three four", line2);
+ assertEquals("five six", line3);
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testLocalCatStream() throws Exception {
+ File localFile = File.createTempFile("topLevel1", ".txt");
+ populateFileWithData(localFile.toPath());
+
+ StreamTool.LocalCatStream catStream =
+ new StreamTool.LocalCatStream(localFile.getAbsolutePath(), -1);
+ List<Tuple> tuples = new ArrayList();
+ try {
+ catStream.open();
+ while (true) {
+ Tuple tuple = catStream.read();
+ if (tuple.EOF) {
+ break;
+ } else {
+ tuples.add(tuple);
+ }
+ }
+
+ } finally {
+ catStream.close();
+ }
+
+ assertEquals(4, tuples.size());
+
+ for (int i = 0; i < 4; i++) {
+ Tuple t = tuples.get(i);
+ assertEquals(localFile.getName() + " line " + (i + 1), t.get("line"));
+ assertEquals(localFile.getAbsolutePath(), t.get("file"));
+ }
+ }
+
+ @Test
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void testListToString() {
+ List stuff = new ArrayList();
+ stuff.add("test1");
+ stuff.add(3);
+ stuff.add(111.32322);
+ stuff.add("test3");
+ String s = StreamTool.listToString(stuff, "|");
+ assertEquals("test1|3|111.32322|test3", s);
+ }
+
+ @Test
+ public void testStdInFailsWithRemoteWorker() throws Exception {
+ String expression = "echo(stdin())";
+
+ String[] args =
+ new String[] {
+ "stream",
+ "-e",
+ "remote",
+ "--name",
+ "fakeCollection",
+ "--verbose",
+ "--zk-host",
+ cluster.getZkClient().getZkServerAddress(),
+ expression
+ };
+ assertEquals(1, runTool(args));
+ }
+
+ @Test
+ public void testStdInSucceedsWithLocalWorker() throws Exception {
+ String expression = "echo(stdin())";
+
+ String[] args =
+ new String[] {
+ "stream",
+ "-e",
+ "local",
+ "-v",
+ "-z",
+ cluster.getZkClient().getZkServerAddress(),
+ expression
+ };
+ assertEquals(0, runTool(args));
+ }
+
+ @Test
+ public void testRunEchoStreamLocally() throws Exception {
+
+ String expression = "echo(Hello)";
+ File expressionFile = File.createTempFile("expression", ".EXPR");
+ FileWriter writer = new FileWriter(expressionFile,
Charset.defaultCharset());
+ writer.write(expression);
+ writer.close();
+
+ // test passing in the file
+ // notice that we do not pass in zkHost or solrUrl for a simple echo run
locally.
+ String[] args = {
+ "stream",
+ "-e",
+ "local",
+ "--verbose",
+ "-zk-host",
+ cluster.getZkClient().getZkServerAddress(),
+ expressionFile.getAbsolutePath()
+ };
+
+ assertEquals(0, runTool(args));
+
+ // test passing in the expression directly
+ args =
+ new String[] {
+ "stream",
+ "--execution",
+ "local",
+ "--verbose",
+ "--zk-host",
+ cluster.getZkClient().getZkServerAddress(),
+ expression
+ };
+
+ assertEquals(0, runTool(args));
+ }
+
+ @Test
+ public void testRunEchoStreamRemotely() throws Exception {
+ String collectionName = "streamWorkerCollection";
+ withBasicAuth(CollectionAdminRequest.createCollection(collectionName,
"_default", 1, 1))
+ .processAndWait(cluster.getSolrClient(), 10);
+ waitForState(
+ "Expected collection to be created with 1 shard and 1 replicas",
+ collectionName,
+ clusterShape(1, 1));
+
+ String expression = "echo(Hello)";
+ File expressionFile = File.createTempFile("expression", ".EXPR");
+ FileWriter writer = new FileWriter(expressionFile,
Charset.defaultCharset());
+ writer.write(expression);
+ writer.close();
+
+ // test passing in the file
+ String[] args = {
+ "stream",
+ "-e",
+ "remote",
+ "-c",
+ collectionName,
+ "--verbose",
+ "-z",
+ cluster.getZkClient().getZkServerAddress(),
+ "--credentials",
+ SecurityJson.USER_PASS,
+ expressionFile.getAbsolutePath()
+ };
+
+ assertEquals(0, runTool(args));
+
+ // test passing in the expression directly
+ args =
+ new String[] {
+ "stream",
+ "--execution",
+ "remote",
+ "--name",
+ collectionName,
+ "--verbose",
+ "--zk-host",
+ cluster.getZkClient().getZkServerAddress(),
+ "--credentials",
+ SecurityJson.USER_PASS,
+ expression
+ };
+
+ assertEquals(0, runTool(args));
+ }
+
+ private int runTool(String[] args) throws Exception {
+ Tool tool = findTool(args);
+ assertTrue(tool instanceof StreamTool);
+ CommandLine cli = parseCmdLine(tool, args);
+ return tool.runTool(cli);
+ }
+
+ // Copied from StreamExpressionTest.java
+ private static void populateFileWithData(Path dataFile) throws Exception {
+ // Files.createFile(dataFile);
+ try (final BufferedWriter writer = Files.newBufferedWriter(dataFile,
StandardCharsets.UTF_8)) {
+ for (int i = 1; i <= 4; i++) {
+ writer.write(dataFile.getFileName() + " line " + i);
+ writer.newLine();
+ }
+ }
+ }
+}
diff --git a/solr/packaging/test/test_stream.bats
b/solr/packaging/test/test_stream.bats
new file mode 100644
index 00000000000..63145522c79
--- /dev/null
+++ b/solr/packaging/test/test_stream.bats
@@ -0,0 +1,86 @@
+#!/usr/bin/env bats
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load bats_helper
+
+setup_file() {
+ common_clean_setup
+ solr start -e techproducts
+ solr auth enable --type basicAuth --credentials name:password
+}
+
+teardown_file() {
+ common_setup
+ solr stop --all
+}
+
+setup() {
+ common_setup
+}
+
+teardown() {
+ # save a snapshot of SOLR_HOME for failed tests
+ save_home_on_failure
+}
+
+@test "searching solr via locally executed streaming expression" {
+
+ local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr"
+ echo 'search(techproducts,' > "${solr_stream_file}"
+ echo 'q="name:memory",' >> "${solr_stream_file}"
+ echo 'fl="name,price",' >> "${solr_stream_file}"
+ echo 'sort="price desc"' >> "${solr_stream_file}"
+ echo ')' >> "${solr_stream_file}"
+
+ run solr stream --execution local --header --credentials name:password
${solr_stream_file}
+
+ assert_output --partial 'name price'
+ assert_output --partial 'CORSAIR XMS'
+ refute_output --partial 'ERROR'
+}
+
+@test "searching solr via remotely executed streaming expression" {
+
+ local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr"
+ echo 'search(techproducts,' > "${solr_stream_file}"
+ echo 'q="name:memory",' >> "${solr_stream_file}"
+ echo 'fl="name,price",' >> "${solr_stream_file}"
+ echo 'sort="price desc"' >> "${solr_stream_file}"
+ echo ')' >> "${solr_stream_file}"
+
+ run solr stream -e remote --name techproducts --solr-url
http://localhost:${SOLR_PORT} --header --credentials name:password
${solr_stream_file}
+
+ assert_output --partial 'name price'
+ assert_output --partial 'CORSAIR XMS'
+ refute_output --partial 'ERROR'
+}
+
+@test "variable interpolation" {
+
+ local solr_stream_file="${BATS_TEST_TMPDIR}/search.expr"
+ echo 'search(techproducts,' > "${solr_stream_file}"
+ echo 'q="name:$1",' >> "${solr_stream_file}"
+ echo 'fl="name,price",' >> "${solr_stream_file}"
+ echo 'sort="price $2"' >> "${solr_stream_file}"
+ echo ')' >> "${solr_stream_file}"
+
+ run solr stream --execution local --header --credentials name:password
${solr_stream_file} apple asc
+
+ assert_output --partial 'name price'
+ assert_output --partial 'Apple 60 GB iPod'
+ refute_output --partial 'ERROR'
+}
diff --git a/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc
b/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc
new file mode 100644
index 00000000000..20fe2458e42
--- /dev/null
+++ b/solr/solr-ref-guide/modules/query-guide/pages/stream-tool.adoc
@@ -0,0 +1,176 @@
+= Stream Tool
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+The Stream tool allows you to run a xref:streaming-expressions.adoc[] in Solr
and see the results from the command line.
+It is very similar to the xref:stream-screen.adoc[], but is part of the
`bin/solr` CLI.
+Being a CLI, you can pipe content into it similar to other Unix style tools,
as well as run actually RUN many kinds of expressions locally as well.
+
+NOTE: The Stream Tool is classified as "experimental".
+It may change in backwards-incompatible ways as it evolves to cover additional
functionality.
+
+To run it, open a terminal and enter:
+
+[,console]
+----
+$ bin/solr stream --header -c techproducts --delimiter=\|
'search(techproducts,q="name:memory",fl="name,price")'
+----
+
+This will run the provided streaming expression on the `techproducts`
collection on your local Solr and produce:
+
+[,console]
+----
+name|price
+CORSAIR XMS 2GB (2 x 1GB) 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) Dual
Channel Kit System Memory - Retail|185.0
+CORSAIR ValueSelect 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System
Memory - Retail|74.99
+A-DATA V-Series 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System
Memory - OEM|
+----
+
+TIP: Notice how we used the pipe character (|) as the delimiter? It required
a backslash for escaping it so it wouldn't be treated as a pipe within the
shell script.
+
+You can also specify a file with the suffix `.expr` containing your streaming
expression.
+This is useful for longer expressions or if you are experiencing shell
character-escaping issues with your expression.
+
+Assuming you have create the file `stream.expr` with the contents:
+
+----
+# Stream a search
+
+search(
+ techproducts,
+ q="name:memory",
+ fl="name,price",
+ sort="price desc"
+)
+----
+
+Then you can run it on the Solr collection `techproducts`, specifying you want
a header row:
+
+[,console]
+----
+$ bin/solr stream --header -c techproducts stream.expr
+----
+
+And this will produce:
+
+[,console]
+----
+name price
+CORSAIR XMS 2GB (2 x 1GB) 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) Dual
Channel Kit System Memory - Retail 185.0
+CORSAIR ValueSelect 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System
Memory - Retail 74.99
+A-DATA V-Series 1GB 184-Pin DDR SDRAM Unbuffered DDR 400 (PC 3200) System
Memory - OEM
+----
+
+== Using the bin/solr stream Tool
+
+To use the tool you need to provide the streaming expression either inline as
the last argument, or provide a file ending in `.expr` that contains the
expression.
+
+The `--help` (or simply `-h`) option will output information on its usage
(i.e., `bin/solr stream --help)`:
+
+[source,plain]
+----
+usage: bin/solr stream [--array-delimiter <CHARACTER>] [-c <NAME>]
[--delimiter <CHARACTER>] [-e <ENVIRONMENT>] [-f
+ <FIELDS>] [-h] [--header] [-s <HOST>] [-u <credentials>] [-v] [-z
<HOST>]
+
+List of options:
+ --array-delimiter <CHARACTER> The delimiter multi-valued fields. Default
to using a pipe (|) delimiter.
+ -c,--name <NAME> Name of the specific collection to execute
expression on if the execution is set
+ to 'remote'. Required for 'remote'
execution environment.
+ --delimiter <CHARACTER> The output delimiter. Default to using
three spaces.
+ -e,--execution <ENVIRONMENT> Execution environment is either 'local'
(i.e CLI process) or via a 'remote' Solr
+ server. Default environment is 'remote'.
+ -f,--fields <FIELDS> The fields in the tuples to output.
Defaults to fields in the first tuple of result
+ set.
+ -h,--help Print this message.
+ --header Specify to include a header line.
+ -s,--solr-url <HOST> Base Solr URL, which can be used to
determine the zk-host if that's not known;
+ defaults to: http://localhost:8983.
+ -u,--credentials <credentials> Credentials in the format
username:password. Example: --credentials solr:SolrRocks
+ -v,--verbose Enable verbose command output.
+ -z,--zk-host <HOST> Zookeeper connection string; unnecessary
if ZK_HOST is defined in solr.in.sh;
+ otherwise, defaults to localhost:9983.
+----
+
+== Examples Using bin/solr stream
+
+There are several ways to use `bin/solr stream`.
+This section presents several examples.
+
+=== Executing Expression Locally
+
+Streaming Expressions by default are executed in the Solr cluster.
+However there are use cases where you want to interact with data in your local
environment, or even run a streaming expression independent of Solr.
+
+The Stream Tool allows you to specify `--execution local` to process the
expression in the Solr CLI's JVM.
+
+However, "local" processing does not imply a networking sandbox.
+Many streaming expressions, such as `search` and `update`, will make network
requests to remote Solr nodes if configured to do so, even in "local" execution
mode.
+
+Assuming you have create the file `load_data.expr` with the contents:
+
+----
+# Index CSV File
+
+update(
+ gettingstarted,
+ parseCSV(
+ cat(./example/exampledocs/books.csv, maxLines=2)
+ )
+)
+----
+
+Running this expression will read in the local file and send the first two
lines to the collection `gettingstarted`.
+
+TIP: Want to send data to a remote Solr? pass in `--solr-url
http://solr.remote:8983`.
+
+
+[,console]
+----
+$ bin/solr stream --execution local --header load_data.expr
+----
+
+
+The StreamTool adds some Streaming Expressions specifically for local use:
+
+* stdin() lets you pipe data directly into the streaming expression.
+* cat() that allows you to read ANY file on your local system. This is
different from the xref:stream-source-reference.adoc#cat[`cat`] operator that
runs in Solr that only accesses `$SOLR_HOME/userfiles/`.
+
+Caveats:
+
+ * You don't get to use any of the parallelization support that is available
when you run the expression on the cluster.
+ * Anything that requires Solr internals access won't work with the
`--execution local` context.
+
+=== Piping data to an expression
+
+Index a CSV file into `gettingstarted` collection.
+
+[,console]
+----
+$ cat example/exampledocs/books.csv | bin/solr stream -e local
'update(gettingstarted,parseCSV(stdin()))'
+----
+
+=== Variable interpolation
+
+You can do variable interpolation via having `$1`, `$2` etc in your streaming
expression, and then passing those values as arguments.
+
+[,console]
+----
+$ bin/solr stream -c techproducts 'echo("$1")' "Hello World"
+Hello World
+----
+
+This also works when using `.expr` files.
diff --git
a/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc
b/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc
index a9a6bf564aa..cc3e502a2bc 100644
--- a/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc
+++ b/solr/solr-ref-guide/modules/query-guide/pages/streaming-expressions.adoc
@@ -143,3 +143,7 @@ The xref:math-expressions.adoc[] has in depth coverage of
visualization techniqu
=== Stream Screen
* xref:stream-screen.adoc[]: Submit streaming expressions and see results and
parsing explanations.
+
+=== Stream Tool
+
+* xref:stream-tool.adoc[]: Submit streaming expressions and see results via
`bin/solr stream`.
diff --git a/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc
b/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc
index aa3f0fbade7..973bd80f005 100644
--- a/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc
+++ b/solr/solr-ref-guide/modules/query-guide/querying-nav.adoc
@@ -96,3 +96,4 @@
** xref:graph-traversal.adoc[]
** xref:stream-api.adoc[]
** xref:stream-screen.adoc[]
+** xref:stream-tool.adoc[]
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
index 45ce93c30c4..7550d1a35c4 100644
---
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/SolrClientCache.java
@@ -17,7 +17,6 @@
package org.apache.solr.client.solrj.io;
import java.io.Closeable;
-import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,14 +37,10 @@ import org.apache.solr.client.solrj.impl.SolrClientBuilder;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.URLUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** The SolrClientCache caches SolrClients, so they can be reused by different
TupleStreams. */
public class SolrClientCache implements Closeable {
- private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
// Set the floor for timeouts to 60 seconds.
// Timeouts can be increased by setting the system properties defined below.
private static final int MIN_TIMEOUT = 60000;
@@ -55,6 +50,8 @@ public class SolrClientCache implements Closeable {
private static final int minSocketTimeout =
Math.max(Integer.getInteger(HttpClientUtil.PROP_SO_TIMEOUT,
MIN_TIMEOUT), MIN_TIMEOUT);
+ private String basicAuthCredentials = null; // Only support with the
http2SolrClient
+
private final Map<String, SolrClient> solrClients = new HashMap<>();
private final HttpClient apacheHttpClient;
private final Http2SolrClient http2SolrClient;
@@ -77,6 +74,10 @@ public class SolrClientCache implements Closeable {
this.http2SolrClient = http2SolrClient;
}
+ public void setBasicAuthCredentials(String basicAuthCredentials) {
+ this.basicAuthCredentials = basicAuthCredentials;
+ }
+
public void setDefaultZKHost(String zkHost) {
if (zkHost != null) {
zkHost = zkHost.split("/")[0];
@@ -101,11 +102,12 @@ public class SolrClientCache implements Closeable {
String zkHostNoChroot = zkHost.split("/")[0];
boolean canUseACLs =
Optional.ofNullable(defaultZkHost.get()).map(zkHostNoChroot::equals).orElse(false);
+
final CloudSolrClient client;
if (apacheHttpClient != null) {
client = newCloudLegacySolrClient(zkHost, apacheHttpClient, canUseACLs);
} else {
- client = newCloudHttp2SolrClient(zkHost, http2SolrClient, canUseACLs);
+ client = newCloudHttp2SolrClient(zkHost, http2SolrClient, canUseACLs,
basicAuthCredentials);
}
solrClients.put(zkHost, client);
return client;
@@ -129,12 +131,17 @@ public class SolrClientCache implements Closeable {
}
private static CloudHttp2SolrClient newCloudHttp2SolrClient(
- String zkHost, Http2SolrClient http2SolrClient, boolean canUseACLs) {
+ String zkHost,
+ Http2SolrClient http2SolrClient,
+ boolean canUseACLs,
+ String basicAuthCredentials) {
final List<String> hosts = List.of(zkHost);
var builder = new CloudHttp2SolrClient.Builder(hosts, Optional.empty());
builder.canUseZkACLs(canUseACLs);
// using internal builder to ensure the internal client gets closed
- builder =
builder.withInternalClientBuilder(newHttp2SolrClientBuilder(null,
http2SolrClient));
+ builder =
+ builder.withInternalClientBuilder(
+ newHttp2SolrClientBuilder(null, http2SolrClient,
basicAuthCredentials));
var client = builder.build();
try {
client.connect();
@@ -163,7 +170,7 @@ public class SolrClientCache implements Closeable {
if (apacheHttpClient != null) {
client = newHttpSolrClient(baseUrl, apacheHttpClient);
} else {
- client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient).build();
+ client = newHttp2SolrClientBuilder(baseUrl, http2SolrClient,
basicAuthCredentials).build();
}
solrClients.put(baseUrl, client);
return client;
@@ -190,7 +197,7 @@ public class SolrClientCache implements Closeable {
}
private static Http2SolrClient.Builder newHttp2SolrClientBuilder(
- String url, Http2SolrClient http2SolrClient) {
+ String url, Http2SolrClient http2SolrClient, String
basicAuthCredentials) {
final var builder =
(url == null || URLUtil.isBaseUrl(url)) // URL may be null here and
set by caller
? new Http2SolrClient.Builder(url)
@@ -199,6 +206,8 @@ public class SolrClientCache implements Closeable {
if (http2SolrClient != null) {
builder.withHttpClient(http2SolrClient);
}
+ builder.withOptionalBasicAuthCredentials(basicAuthCredentials);
+
long idleTimeout = minSocketTimeout;
if (builder.getIdleTimeoutMillis() != null) {
idleTimeout = Math.max(idleTimeout, builder.getIdleTimeoutMillis());
diff --git
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
index 9576cf9658e..fc26a8972f7 100644
---
a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
+++
b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/LetStream.java
@@ -224,4 +224,9 @@ public class LetStream extends TupleStream implements
Expressible {
public int getCost() {
return 0;
}
+
+ @SuppressWarnings({"rawtypes"})
+ public Map getLetParams() {
+ return this.letParams;
+ }
}