mvolikas commented on code in PR #1488:
URL:
https://github.com/apache/incubator-stormcrawler/pull/1488#discussion_r1976646575
##########
external/solr/src/main/java/org/apache/stormcrawler/solr/SolrConnection.java:
##########
@@ -17,88 +17,127 @@
package org.apache.stormcrawler.solr;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
+import org.apache.solr.client.solrj.impl.CloudHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.ConcurrentUpdateHttp2SolrClient;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.impl.LBHttp2SolrClient;
+import org.apache.solr.client.solrj.impl.LBSolrClient;
+import org.apache.solr.client.solrj.request.QueryRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
import org.apache.stormcrawler.util.ConfUtils;
public class SolrConnection {
private SolrClient client;
- private UpdateRequest request;
+ private SolrClient updateClient;
- private SolrConnection(SolrClient sc, UpdateRequest r) {
- client = sc;
- request = r;
+ private static boolean cloud;
+ private static String collection;
+
+ private SolrConnection(SolrClient client, SolrClient updateClient) {
+ this.client = client;
+ this.updateClient = updateClient;
}
public SolrClient getClient() {
return client;
}
- public UpdateRequest getRequest() {
- return request;
+ public SolrClient getUpdateClient() {
+ return updateClient;
}
- public static SolrClient getClient(Map stormConf, String boltType) {
+ public CompletableFuture<QueryResponse> requestAsync(QueryRequest request)
{
+ if (cloud) {
+ CloudHttp2SolrClient cloudHttp2SolrClient = (CloudHttp2SolrClient)
client;
+
+ // Get the Solr endpoints
+ Collection<Slice> activeSlices =
+ cloudHttp2SolrClient
+ .getClusterState()
+ .getCollection(collection)
+ .getActiveSlices();
+
+ List<LBSolrClient.Endpoint> endpoints = new ArrayList<>();
+ for (Slice slice : activeSlices) {
+ for (Replica replica : slice.getReplicas()) {
+ if (replica.getState() == Replica.State.ACTIVE) {
+ endpoints.add(new
LBSolrClient.Endpoint(replica.getBaseUrl(), collection));
+ }
+ }
+ }
+
+ // Shuffle the endpoints for basic load balancing
+ Collections.shuffle(endpoints);
+
+ // Get the async client
+ LBHttp2SolrClient lbHttp2SolrClient =
cloudHttp2SolrClient.getLbClient();
+ LBSolrClient.Req req = new LBSolrClient.Req(request, endpoints);
Review Comment:
From
[LBHttp2SolrClient.html#requestAsync](https://solr.apache.org/docs/9_7_0/solrj/org/apache/solr/client/solrj/impl/LBHttp2SolrClient.html#requestAsync(org.apache.solr.client.solrj.impl.LBSolrClient.Req)):
> Execute an asynchronous request against one or more hosts for a given
collection. The passed-in Req object includes a List of Endpoints. This method
always begins with the first Endpoint in the list and if unsuccessful tries
each in turn until the request is successful. Consequently, this method does
not actually Load Balance. It is up to the caller to shuffle the List of
Endpoints if Load Balancing is desired.
I just randomly shuffle the endpoints for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]