This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/main by this push:
new 5bb18df Move batching from solr+kafka to just kafka. (#53)
5bb18df is described below
commit 5bb18df43fbab85043dd9b9b30319a4073ccffca
Author: Mark Robert Miller <[email protected]>
AuthorDate: Mon Apr 10 16:13:05 2023 -0500
Move batching from solr+kafka to just kafka. (#53)
* Move batching from solr+kafka to just kafka.
Make consumer multi-threaded.
Fix flakey tests.
Allow multiple collections to use the same topic on the same consumer.
Consumer can subscribe to multiple topics.
Default to a hard fail for the client when update is too large for Kafka
queue.
* Few minor updates.
* Add 22 to absolutely ensure unique id.
---
.github/workflows/manual-test.yml | 44 +++
CROSSDC.md | 203 +++---------
SolrAndKafkaIntegrationTest.java | 335 ++++++++++++++++++++
build.gradle | 2 +-
.../solr/crossdc/common/KafkaCrossDcConf.java | 12 +-
.../common/MirroredSolrRequestSerializer.java | 73 ++---
.../org/apache/solr/crossdc/consumer/Consumer.java | 22 +-
.../crossdc/consumer/KafkaCrossDcConsumer.java | 341 ++++++++++++++++-----
.../crossdc/consumer/KafkaCrossDcConsumerTest.java | 151 +++++++++
crossdc-producer/build.gradle | 5 +-
.../update/processor/MirroringUpdateProcessor.java | 102 +++---
.../MirroringUpdateRequestProcessorFactory.java | 17 +-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 149 ++++-----
...SolrAndKafkaMultiCollectionIntegrationTest.java | 222 ++++++++++++++
.../solr/crossdc/SolrAndKafkaReindexTest.java | 42 +--
.../solr/crossdc/ZkConfigIntegrationTest.java | 17 +-
.../processor/MirroringUpdateProcessorTest.java | 177 +++++++++++
crossdc-producer/src/test/resources/log4j2.xml | 10 +-
gradlew | 0
19 files changed, 1463 insertions(+), 461 deletions(-)
diff --git a/.github/workflows/manual-test.yml
b/.github/workflows/manual-test.yml
new file mode 100644
index 0000000..0d217a7
--- /dev/null
+++ b/.github/workflows/manual-test.yml
@@ -0,0 +1,44 @@
+name: SolrJ Tests
+
+on:
+ workflow_dispatch
+
+concurrency:
+ group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label ||
github.head_ref || github.ref }}'
+ cancel-in-progress: true
+
+
+jobs:
+ build:
+ container:
+ image: ubuntu-latest
+ options: --network bridge
+ test:
+ name: Run SolrJ Tests
+ runs-on: ubuntu-latest
+ steps:
+ # Setup
+ - uses: actions/checkout@v2
+ - name: Set up JDK 11
+ uses: actions/setup-java@v2
+ with:
+ distribution: 'temurin'
+ java-version: 11
+ java-package: jdk
+ - name: Grant execute permission for scipts
+ run: chmod +x cluster.sh;chmod +x cluster-stop.sh;chmod +x manual-test.sh
+ - name: Grant execute permission for gradlew
+ run: chmod +x gradlew
+ - name: Build
+ run: ./gradlew build
+
+# - uses: actions/cache@v2
+# with:
+# path: |
+# ~/.gradle/caches
+# key: ${{ runner.os }}-gradle-solrj-${{ hashFiles('versions.lock') }}
+# restore-keys: |
+# ${{ runner.os }}-gradle-solrj-
+# ${{ runner.os }}-gradle-
+ - name: Init cluster
+ run: ./manual-test.sh
diff --git a/CROSSDC.md b/CROSSDC.md
index 5c528f4..a0d5d75 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -1,199 +1,84 @@
# Solr Cross DC: Getting Started
-**A simple cross-data-center fail-over solution for Apache Solr.**
-
-
-
-[TOC]
+Solr Cross DC is a simple cross-data-center fail-over solution for Apache
Solr. It has three key components: the CrossDC Producer, the CrossDC Consumer,
and Apache Kafka. The Producer is a Solr UpdateProcessor plugin that forwards
updates from the primary data center, while the Consumer is an update request
consumer application that receives updates in the backup data center. Kafka is
the distributed queue that connects the two.
## Overview
-The design for this feature involves three key components:
+Solr Cross DC is designed to provide a simple and reliable way to replicate
Solr updates across multiple data centers. It is particularly useful for
organizations that need to ensure high availability and disaster recovery for
their Solr clusters.
-- A UpdateProccessor plugin for Solr to forward updates from the primary data
center.
-- An update request consumer application to receive updates in the backup data
center.
-- A distributed queue to connect the above two.
-
-The UpdateProcessor plugin is called the CrossDC Producer, the consumer
application is called the CrossDC Consumer, and the supported distributed queue
application is Apache Kafka.
+The CrossDC Producer intercepts updates when the node acts as the leader and
puts those updates onto the distributed queue. The CrossDC Consumer polls the
distributed queue and forwards updates to the configured Solr cluster upon
receiving the update requests.
## Getting Started
-To use Solr Cross DC, you must complete the following steps:
-
-- Startup or obtain access to an Apache Kafka cluster to provide the
distributed queue between data centers.
-- Install the CrossDC Solr plugin on each of the nodes in your Solr cluster
(in your primary and backup data centers) by placing the jar in the correct
location and configuring solrconfig.xml to reference the new UpdateProcessor
and then configure it for the Kafka cluster.
-- Install the CrossDC consumer application in the backup data center and
configure it for the Kafka cluster and the Solr cluster it will send consumed
updates to.
+To use Solr Cross DC, follow these steps:
-The Solr UpdateProccessor plugin will intercept updates when the node acts as
the leader and then put those updates onto the distributed queue. The CrossDC
Consumer application will poll the distributed queue and forward updates on to
the configured Solr cluster upon receiving the update requests.
+1. Startup or obtain access to an Apache Kafka cluster to provide the
distributed queue between data centers.
+2. Install the CrossDC Solr plugin on each node in your Solr cluster (in both
primary and backup data centers). Place the jar in the sharedLib directory
specified in solr.xml and configure solrconfig.xml to reference the new
UpdateProcessor and configure it for the Kafka cluster.
+3. Install the CrossDC consumer application in the backup data center and
configure it for the Kafka cluster and the Solr cluster it will send consumed
updates to.
### Configuration and Startup
-The current configuration options are entirely minimal. Further configuration
options will be added over time. At this early stage, some may also change.
-
#### Installing and Configuring the Cross DC Producer Solr Plug-In
-1. Configure the sharedLib directory in solr.xml (eg sharedLIb=lib) and place
the CrossDC producer plug-in jar file into the specified folder. It's not
advisable to attempt to use the per SolrCore instance directory lib folder as
you would have to duplicate the plug-in many times and manage it when creating
new collections or adding replicas or shards.
-
-
-**solr.xml**
+1. Configure the sharedLib directory in solr.xml (e.g., sharedLIb=lib) and
place the CrossDC producer plug-in jar file into the specified folder.
+ **solr.xml**
```xml
<solr>
<str name="sharedLib">${solr.sharedLib:}</str>
```
-
-
-
-2. Configure the new UpdateProcessor in solrconfig.xml
-
- **NOTE:** `The following is not the recommended configuration approach in
production, see the information on central configuration below!`
-
-
-
-**solrconfig.xml**
-
- ```xml
- <updateRequestProcessorChain name="mirrorUpdateChain" default="true">
-
- <processor
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
- <str name="bootstrapServers">${bootstrapServers:}</str>
- <str name="topicName">${topicName:}</str>
- </processor>
-
- <processor class="solr.LogUpdateProcessorFactory" />
- <processor class="solr.RunUpdateProcessorFactory" />
- </updateRequestProcessorChain>
- ```
-
-Notice that this update chain has been declared to be the default chain used.
-
-
-
-##### Configuration Properties
-
-There are two configuration properties. You can specify them directly, or use
the above notation to allow them to specified via system property (generally
configured for Solr in the bin/solr.in.sh file).
-
- ```
- bootstrapServers
- ```
-
-The list of servers used to connect to the Kafka cluster, see
https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
-
- ```
- topicName
- ```
-
-The Kafka topicName used to indicate which Kafka queue the Solr updates will
be pushed on to.
-
-
-
-3. Add an external version constraint UpdateProcessor to the update chain
added to solrconfig.xml to allow user-provided update versions (as opposed to
the two Solr clusters using the independently managed built-in versioning).
-
-
https://solr.apache.org/guide/8_11/update-request-processors.html#general-use-updateprocessorfactories
-
-
https://solr.apache.org/docs/8_1_1/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.html
-
-
+3. Configure the new UpdateProcessor in solrconfig.xml.
+ ```xml
+ <updateRequestProcessorChain name="mirrorUpdateChain" default="true">
+
+ <processor
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+ <str name="bootstrapServers">${bootstrapServers:}</str>
+ <str name="topicName">${topicName:}</str>
+ </processor>
+
+ <processor class="solr.LogUpdateProcessorFactory" />
+ <processor class="solr.RunUpdateProcessorFactory" />
+ </updateRequestProcessorChain>
+ ```
+4. Add an external version constraint UpdateProcessor to the update chain
added to solrconfig.xml to allow user-provided update versions.
+ See
https://solr.apache.org/guide/8_11/update-request-processors.html#general-use-updateprocessorfactories
and
https://solr.apache.org/docs/8_1_1/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.html
4. Start or restart the Solr cluster(s).
+##### Configuration Properties
+There are two configuration properties:
+- `bootstrapServers`: list of servers used to connect to the Kafka cluster
+- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr
updates will be pushed on
#### Installing and Configuring the CrossDC Consumer Application
1. Uncompress the distribution tar or zip file for the CrossDC Consumer into
an appropriate install location on a node in the receiving data center.
-2. You can start the Consumer process via the included shell start script at
bin/crossdc-consumer.
-3. You can configure the CrossDC Consumer via Java system properties pass in
the CROSSDC_CONSUMER_OPTS environment variable, i.e.
CROSSDC_CONSUMER_OPTS="-DbootstrapServers=127.0.0.1:2181
-DzkConnectString=127.0.0.1:2181 -DtopicName=crossdc" bin/crossdc-consumer
-
-The required configuration properties are:
-
+2. Start the Consumer process via the included shell start script at
bin/crossdc-consumer.
+3. Configure the CrossDC Consumer via Java system properties pass in the
CROSSDC_CONSUMER_OPTS environment variable.
- *bootstrapServers* - the list of servers used to connect to the Kafka
cluster
https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
+The required configuration properties are:
+- `bootstrapServers`: list of servers used to connect to the Kafka cluster
+- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr
updates will be pushed to. This can be a comma separated list for the Consumer
if you would like to consume multiple topics.
+- `zkConnectString`: Zookeeper connection string used by Solr to connect to
its Zookeeper cluster in the backup data center
- *topicName* - the Kafka topicName used to indicate which Kafka queue the
Solr updates will be pushed to.
+The following additional configuration properties should either be specified
for both the producer and the consumer or in the shared Zookeeper central
config properties file:
- *zkConnectString* - the Zookeeper connection string used by Solr to connect
to its Zookeeper cluster in the backup data center
-
-Additional configuration properties:
-
- *groupId* - the group id to give Kafka for the consumer, default to the
empty string if not specified.
-
-The following additional configuration properties should either be specified
for both the producer and the consumer or in the shared Zookeeper
-central config properties file. This is because the Consumer will use a
Producer for retries.
-
- *batchSizeBytes* - the maximum batch size in bytes for the queue
- *bufferMemoryBytes* - the amount of memory in bytes allocated by the
Producer in total for buffering
- *lingerMs* - the amount of time that the Producer will wait to add to a
batch
- *requestTimeout* - request timeout for the Producer - when used for the
Consumers retry Producer, this should be less than the timeout that will cause
the Consumer to be removed from the group for taking too long.
+- `batchSizeBytes`: maximum batch size in bytes for the queue
+- `bufferMemoryBytes`: memory allocated by the Producer in total for buffering
+- `lingerMs`: amount of time that the Producer will wait to add to a batch
+- `requestTimeout`: request timeout for the Producer
#### Central Configuration Option
-You can optionally manage the configuration centrally in Solr's Zookeeper
cluster by placing a properties file called *crossdc.properties* in the root
Solr Zookeeper znode, eg, */solr/crossdc.properties*. This allows you to
update the configuration in a central location rather than at each
solrconfig.xml in each Solr node and also automatically deals with new Solr
nodes or Consumers to come up without requiring additional configuration.
-
-
-
-Both *bootstrapServers* and *topicName* properties can be put in this file, in
which case you would not have to specify any Kafka configuration in the
solrconfig.xml for the CrossDC Producer Solr plugin. Likewise, for the CrossDC
Consumer application, you would only have to set *zkConnectString* for the
local Solr cluster. Note that the two components will be looking in the
Zookeeper clusters in their respective data center locations.
-
-You can override the properties file location and znode name in Zookeeper
using the system property
*zkCrossDcPropsPath=/path/to/props_file_name.properties*
+You can manage the configuration centrally in Solr's Zookeeper cluster by
placing a properties file called *crossdc.properties* in the root Solr
Zookeeper znode, eg, */solr/crossdc.properties*. Both *bootstrapServers* and
*topicName* properties can be put in this file. For the CrossDC Consumer
application, you would only have to set *zkConnectString* for the local Solr
cluster.
#### Making the Cross DC UpdateProcessor Optional in a Common solrconfig.xml
-The simplest and least invasive way to control whether the Cross DC
UpdateProcessor is on or off for a node is to configure the update chain it's
used in to be the default chain or not via Solr's system property configuration
syntax. This syntax takes the form of ${*system_property_name*} and will be
substituted with the value of that system property when the configuration is
parsed. You can specify a default value using the following syntax:
${*system_property_name*:*default_value*}. Y [...]
-
-*Having a separate updateRequestProcessorChain avoids a lot of additional
constraints you have to deal with or consider, now or in the future, when
compared to forcing all Cross DC and non-Cross DC use down a single, required,
common updateRequestProcessorChain.*
-
-Further, any application consuming the configuration with no concern for
enabling Cross DC will not be artificially limited in its ability to define,
manage and use updateRequestProcessorChain's.
-
-The following would enable a system property to safely and non invasively
enable or disable Cross DC for a node:
-
-
-```xml
-<updateRequestProcessorChain name="crossdcUpdateChain"
default="${crossdcEnabled:false}">
- <processor
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
- <bool name="enabled">${enabled:false}</bool>
- </processor>
- <processor class="solr.LogUpdateProcessorFactory" />
- <processor class="solr.RunUpdateProcessorFactory" />
-</updateRequestProcessorChain>
-```
-
-
-
-The above configuration would default to Cross DC being disabled with minimal
impact to any non-Cross DC use, and Cross DC could be enabled by starting Solr
with the system property crossdcEnabled=true.
-
-The last chain to declare it's the default wins, so you can put this at the
bottom of almost any existing solrconfig.xml to create an optional Cross DC
path without having to audit, understand, adapt, or test existing non-Cross DC
paths as other options call for.
-
-The above is the simplest and least obtrusive way to manage an on/off switch
for Cross DC.
-
-**Note:** If your configuration already makes use of update handlers and/or
updates independently specifying different updateRequestProcessorChains, your
solution may end up a bit more sophisticated.
-
-
-
-For situations where you do want to control and enforce a single
updateRequestProcessorChain path for every consumer of the solrconfig.xml, it's
enough to simply use the *enabled* attribute, turning the processor into a NOOP
in the chain.
-
-
-
-```xml
-<updateRequestProcessorChain name="crossdcUpdateChain">
- <processor
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
- <bool name="enabled">${enabled:false}</bool>
- </processor>
- <processor class="solr.LogUpdateProcessorFactory" />
- <processor class="solr.RunUpdateProcessorFactory" />
-</updateRequestProcessorChain>
-```
-
-
+Use the *enabled* attribute, false turns the processor into a NOOP in the
chain.
## Limitations
-- Delete-By-Query is not officially supported.
-
- - Work-In-Progress: A non-efficient option to issue multiple delete by id
queries using the results of a given standard query.
-
- - Simply forwarding a real Delete-By-Query could also be reasonable if it
is not strictly reliant on not being reordered with other requests.
-
-
-
-## Additional Notes
+- Delete-By-Query converts to DeleteById, which can be much less efficient for
queries matching large numbers of documents.
+ Forwarding a real Delete-By-Query could also be reasonable if it is not
strictly reliant on not being reordered with other requests.
-In these early days, it may help to reference the *cluster.sh* script located
in the root of the CrossDC repository. This script is a helpful developer tool
for manual testing and it will download Solr and Kafka and then configure both
for Cross DC.
\ No newline at end of file
+cluster.sh* script located in the root of the CrossDC repository. This script
is a helpful developer tool for manual testing and it will download Solr and
Kafka and then configure both for Cross DC.
\ No newline at end of file
diff --git a/SolrAndKafkaIntegrationTest.java b/SolrAndKafkaIntegrationTest.java
new file mode 100644
index 0000000..f935538
--- /dev/null
+++ b/SolrAndKafkaIntegrationTest.java
@@ -0,0 +1,335 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BaseCloudSolrClient;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.sys.Prop;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
+import static org.mockito.Mockito.spy;
+
+@ThreadLeakFilters(defaultFilters = true, filters = {
SolrIgnoredThreadsFilter.class,
+ QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaIntegrationTest
extends
+ SolrTestCaseJ4 {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int MAX_DOC_SIZE_BYTES =
Integer.parseInt(DEFAULT_MAX_REQUEST_SIZE);
+
+ static final String VERSION_FIELD = "_version_";
+
+ private static final int NUM_BROKERS = 1;
+ public EmbeddedKafkaCluster kafkaCluster;
+
+ protected volatile MiniSolrCloudCluster solrCluster1;
+ protected volatile MiniSolrCloudCluster solrCluster2;
+
+ protected static volatile Consumer consumer;
+
+ private static String TOPIC = "topic1";
+
+ private static String COLLECTION = "collection1";
+ private static String ALT_COLLECTION = "collection2";
+
+ @Before
+ public void beforeSolrAndKafkaIntegrationTest() throws Exception {
+ consumer = new Consumer();
+ Properties config = new Properties();
+ //config.put("unclean.leader.election.enable", "true");
+ //config.put("enable.partition.eof", "false");
+
+ kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+ public String bootstrapServers() {
+ return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+ }
+ };
+ kafkaCluster.start();
+
+ kafkaCluster.createTopic(TOPIC, 1, 1);
+
+ System.setProperty("topicName", TOPIC);
+ System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+ System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
+
+ solrCluster1 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
+
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+ solrCluster2 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
+
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ CollectionAdminRequest.Create create2 =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster2.getSolrClient().request(create2);
+ solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+ String bootstrapServers = kafkaCluster.bootstrapServers();
+ log.info("bootstrapServers={}", bootstrapServers);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+ properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES,
MAX_DOC_SIZE_BYTES);
+ consumer.start(properties);
+
+ }
+
+ @After
+ public void afterSolrAndKafkaIntegrationTest() throws Exception {
+ ObjectReleaseTracker.clear();
+
+ if (solrCluster1 != null) {
+ solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster1.shutdown();
+ }
+ if (solrCluster2 != null) {
+ solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster2.shutdown();
+ }
+
+ consumer.shutdown();
+ consumer = null;
+
+ try {
+ //kafkaCluster.deleteAllTopicsAndWait(10000);
+ kafkaCluster.stop();
+ kafkaCluster = null;
+ } catch (Exception e) {
+ log.error("Exception stopping Kafka cluster", e);
+ }
+
+
+ }
+
+// public void testFullCloudToCloud() throws Exception {
+// CloudSolrClient client = solrCluster1.getSolrClient();
+// SolrInputDocument doc = new SolrInputDocument();
+// doc.addField("id", String.valueOf(System.currentTimeMillis()));
+// doc.addField("text", "some test");
+//
+// client.add(doc);
+//
+// client.commit(COLLECTION);
+//
+// System.out.println("Sent producer record");
+//
+// assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+// }
+//
+ private static SolrInputDocument getDoc() {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.nanoTime()));
+ doc.addField("text", "some test");
+ return doc;
+ }
+//
+// public void testProducerToCloud() throws Exception {
+// Properties properties = new Properties();
+// properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
+// properties.put("acks", "all");
+// properties.put("retries", 1);
+// properties.put("batch.size", 1);
+// properties.put("buffer.memory", 33554432);
+// properties.put("linger.ms", 1);
+// properties.put("key.serializer", StringSerializer.class.getName());
+// properties.put("value.serializer",
MirroredSolrRequestSerializer.class.getName());
+// Producer<String, MirroredSolrRequest> producer = new
KafkaProducer(properties);
+// UpdateRequest updateRequest = new UpdateRequest();
+// updateRequest.setParam("shouldMirror", "true");
+// updateRequest.add("id", String.valueOf(System.currentTimeMillis()),
"text", "test");
+// updateRequest.add("id", String.valueOf(System.currentTimeMillis() + 22),
"text", "test2");
+// updateRequest.setParam("collection", COLLECTION);
+// MirroredSolrRequest mirroredSolrRequest = new
MirroredSolrRequest(updateRequest);
+// System.out.println("About to send producer record");
+// producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest), (metadata,
exception) -> {
+// log.info("Producer finished sending metadata={}, exception={}",
metadata, exception);
+// });
+// producer.flush();
+//
+// System.out.println("Sent producer record");
+//
+// solrCluster2.getSolrClient().commit(COLLECTION);
+//
+// assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 2);
+//
+// producer.close();
+// }
+
+// @Test
+// public void testMirroringUpdateProcessor() throws Exception {
+// final SolrInputDocument tooLargeDoc = new SolrInputDocument();
+// tooLargeDoc.addField("id", "tooLarge-" +
String.valueOf(System.currentTimeMillis()));
+// tooLargeDoc.addField("text", new String(new byte[2 *
MAX_DOC_SIZE_BYTES]));
+// final SolrInputDocument normalDoc = new SolrInputDocument();
+// normalDoc.addField("id", "normalDoc-" +
String.valueOf(System.currentTimeMillis()));
+// normalDoc.addField("text", "Hello world");
+// final List<SolrInputDocument> docsToIndex = new ArrayList<>();
+// docsToIndex.add(normalDoc);
+// docsToIndex.add(tooLargeDoc);
+//
+// final CloudSolrClient cluster1Client = solrCluster1.getSolrClient();
+// try {
+// cluster1Client.add(docsToIndex);
+// } catch (BaseCloudSolrClient.RouteException e) {
+// // expected
+// }
+// cluster1Client.commit(COLLECTION);
+//
+// // Primary and secondary should each only index 'normalDoc'
+// final String normalDocQuery = "id:" +
normalDoc.get("id").getFirstValue();
+// assertCluster2EventuallyHasDocs(COLLECTION, normalDocQuery, 1);
+// assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+// assertClusterEventuallyHasDocs(cluster1Client, COLLECTION,
normalDocQuery, 1);
+// assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, "*:*", 1);
+//
+// // Create new primary+secondary collection where 'tooLarge' docs ARE
indexed on the primary
+// CollectionAdminRequest.Create create =
+// CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1)
+// .withProperty("indexUnmirrorableDocs", "true");
+// try {
+// solrCluster1.getSolrClient().request(create);
+// solrCluster2.getSolrClient().request(create);
+// solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+// solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+//
+// cluster1Client.add(ALT_COLLECTION, docsToIndex);
+// cluster1Client.commit(ALT_COLLECTION);
+//
+// // Primary should have both 'normal' and 'large' docs; secondary
should only have 'normal' doc.
+// assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*",
2);
+// assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
+// assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
+// } finally {
+// CollectionAdminRequest.Delete delete =
+// CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+// solrCluster1.getSolrClient().request(delete);
+// solrCluster2.getSolrClient().request(delete);
+// }
+// }
+
+ private void assertCluster2EventuallyHasDocs(String collection, String
query, int expectedNumDocs) throws Exception {
+ assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection,
query, expectedNumDocs);
+ }
+
+ private void createCollection(CloudSolrClient client,
CollectionAdminRequest.Create createCmd) throws Exception {
+ final String stashedDefault = client.getDefaultCollection();
+ try {
+ //client.setDefaultCollection(null);
+ client.request(createCmd);
+ } finally {
+ //client.setDefaultCollection(stashedDefault);
+ }
+ }
+
+ @Test
+ public void testFullCloudToCloudMultiCollection() throws Exception {
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1,
1);
+
+ try {
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+ solrCluster2.getSolrClient().request(create);
+ solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+
+ CloudSolrClient client = solrCluster1.getSolrClient();
+
+ SolrInputDocument doc1 = getDoc();
+ SolrInputDocument doc2 = getDoc();
+ SolrInputDocument doc3 = getDoc();
+ SolrInputDocument doc4 = getDoc();
+ SolrInputDocument doc5 = getDoc();
+ SolrInputDocument doc6 = getDoc();
+ SolrInputDocument doc7 = getDoc();
+
+ client.add(COLLECTION, doc1);
+ client.add(ALT_COLLECTION, doc2);
+ client.add(COLLECTION, doc3);
+ client.add(COLLECTION, doc4);
+ client.add(ALT_COLLECTION, doc5);
+ client.add(ALT_COLLECTION, doc6);
+ client.add(COLLECTION, doc7);
+
+ client.commit(COLLECTION);
+ client.commit(ALT_COLLECTION);
+
+ System.out.println("Sent producer record");
+
+ assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 3);
+ assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 4);
+
+ } finally {
+ CollectionAdminRequest.Delete delete =
+ CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+ solrCluster1.getSolrClient().request(delete);
+ solrCluster2.getSolrClient().request(delete);
+ }
+ }
+
+
+ private void assertClusterEventuallyHasDocs(SolrClient client, String
collection, String query, int expectedNumDocs) throws Exception {
+ QueryResponse results = null;
+ boolean foundUpdates = false;
+ for (int i = 0; i < 100; i++) {
+ client.commit(collection);
+ results = client.query(collection, new SolrQuery(query));
+ if (results.getResults().getNumFound() == expectedNumDocs) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(200);
+ }
+ }
+
+ assertTrue("results=" + results, foundUpdates);
+ }
+}
diff --git a/build.gradle b/build.gradle
index 9a38422..25af428 100644
--- a/build.gradle
+++ b/build.gradle
@@ -28,5 +28,5 @@ description 'Root for Solr plugins sandbox'
subprojects {
group "org.apache.solr.crossdc"
- group "org.apache.solr.encryption"
+ //group "org.apache.solr.encryption"
}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 0b45bbb..0dbc160 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -23,7 +23,6 @@ import
org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import java.util.*;
-import static
org.apache.solr.crossdc.common.SensitivePropRedactionUtils.propertyRequiresRedaction;
import static
org.apache.solr.crossdc.common.SensitivePropRedactionUtils.redactPropertyIfNecessary;
public class KafkaCrossDcConf extends CrossDcConf {
@@ -34,10 +33,13 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880";
public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none";
+ private static final String DEFAULT_INDEX_UNMIRRORABLE_DOCS = "false";
public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000";
public static final String DEFAULT_NUM_RETRIES = null; // by default, we
control retries with DELIVERY_TIMEOUT_MS_DOC
private static final String DEFAULT_RETRY_BACKOFF_MS = "500";
+ private static final String DEFAULT_CONSUMER_PROCESSING_THREADS = "5";
+
private static final String DEFAULT_DELIVERY_TIMEOUT_MS = "120000";
public static final String DEFAULT_MAX_POLL_RECORDS = "500"; // same default
as Kafka
@@ -70,12 +72,16 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String ENABLE_DATA_COMPRESSION = "enableDataCompression";
+ public static final String INDEX_UNMIRRORABLE_DOCS = "indexUnmirrorableDocs";
+
public static final String SLOW_SUBMIT_THRESHOLD_MS =
"slowSubmitThresholdMs";
public static final String NUM_RETRIES = "numRetries";
public static final String RETRY_BACKOFF_MS = "retryBackoffMs";
+ public static final String CONSUMER_PROCESSING_THREADS =
"consumerProcessingThreads";
+
public static final String DELIVERY_TIMEOUT_MS = "retryBackoffMs";
public static final String FETCH_MIN_BYTES = "fetchMinBytes";
@@ -100,8 +106,6 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String GROUP_ID = "groupId";
-
-
static {
List<ConfigProperty> configProperties = new ArrayList<>(
List.of(new ConfigProperty(TOPIC_NAME), new
ConfigProperty(BOOTSTRAP_SERVERS),
@@ -111,6 +115,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(REQUEST_TIMEOUT_MS, DEFAULT_REQUEST_TIMEOUT),
new ConfigProperty(MAX_REQUEST_SIZE_BYTES,
DEFAULT_MAX_REQUEST_SIZE),
new ConfigProperty(ENABLE_DATA_COMPRESSION,
DEFAULT_ENABLE_DATA_COMPRESSION),
+ new ConfigProperty(INDEX_UNMIRRORABLE_DOCS,
DEFAULT_INDEX_UNMIRRORABLE_DOCS),
new ConfigProperty(SLOW_SUBMIT_THRESHOLD_MS,
DEFAULT_SLOW_SEND_THRESHOLD),
new ConfigProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES),
new ConfigProperty(RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MS),
@@ -121,6 +126,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
+ new ConfigProperty(CONSUMER_PROCESSING_THREADS,
DEFAULT_CONSUMER_PROCESSING_THREADS),
new ConfigProperty(MAX_PARTITION_FETCH_BYTES,
DEFAULT_MAX_PARTITION_FETCH_BYTES),
new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 3f0684d..f93c361 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -52,49 +52,51 @@ public class MirroredSolrRequestSerializer implements
Serializer<MirroredSolrReq
@Override
public MirroredSolrRequest deserialize(String topic, byte[] data) {
Map solrRequest;
+ UpdateRequest updateRequest = new UpdateRequest();
+ try (JavaBinCodec codec = new JavaBinCodec()) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
- JavaBinCodec codec = new JavaBinCodec();
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ try {
+ solrRequest = (Map) codec.unmarshal(bais);
- try {
- solrRequest = (Map) codec.unmarshal(bais);
+ if (log.isTraceEnabled()) {
+ log.trace("Deserialized class={} solrRequest={}",
solrRequest.getClass().getName(),
+ solrRequest);
+ }
- if (log.isTraceEnabled()) {
- log.trace("Deserialized class={} solrRequest={}",
solrRequest.getClass().getName(),
- solrRequest);
+ } catch (Exception e) {
+ log.error("Exception unmarshalling JavaBin", e);
+ throw new RuntimeException(e);
}
- } catch (Exception e) {
- log.error("Exception unmarshalling JavaBin", e);
- throw new RuntimeException(e);
- }
-
- UpdateRequest updateRequest = new UpdateRequest();
- List docs = (List) solrRequest.get("docs");
- if (docs != null) {
- updateRequest.add(docs);
- } else {
- updateRequest.add("id", "1");
- updateRequest.getDocumentsMap().clear();
- }
-
- List deletes = (List) solrRequest.get("deletes");
- if (deletes != null) {
- updateRequest.deleteById(deletes);
- }
+ List docs = (List) solrRequest.get("docs");
+ if (docs != null) {
+ updateRequest.add(docs);
+ } else {
+ updateRequest.add("id", "1"); // TODO huh?
+ updateRequest.getDocumentsMap().clear();
+ }
- List deletesQuery = (List) solrRequest.get("deleteQuery");
- if (deletesQuery != null) {
- for (Object delQuery : deletesQuery) {
- updateRequest.deleteByQuery((String) delQuery);
+ List deletes = (List) solrRequest.get("deletes");
+ if (deletes != null) {
+ updateRequest.deleteById(deletes);
}
- }
+ List deletesQuery = (List) solrRequest.get("deleteQuery");
+ if (deletesQuery != null) {
+ for (Object delQuery : deletesQuery) {
+ updateRequest.deleteByQuery((String) delQuery);
+ }
+ }
- Map params = (Map) solrRequest.get("params");
- if (params != null) {
- updateRequest.setParams(ModifiableSolrParams.of(new
MapSolrParams(params)));
+ Map params = (Map) solrRequest.get("params");
+ if (params != null) {
+ updateRequest.setParams(ModifiableSolrParams.of(new
MapSolrParams(params)));
+ }
+ } catch (IOException e) {
+ log.error("Error in deserialize", e);
+ throw new RuntimeException(e);
}
return new MirroredSolrRequest(updateRequest);
@@ -120,12 +122,10 @@ public class MirroredSolrRequestSerializer implements
Serializer<MirroredSolrReq
try (JavaBinCodec codec = new JavaBinCodec(null)) {
ExposedByteArrayOutputStream baos = new
ExposedByteArrayOutputStream();
- Map map = new HashMap(4);
+ Map map = new HashMap(8);
map.put("params", solrRequest.getParams());
map.put("docs", solrRequest.getDocuments());
- // TODO
- //map.put("deletes", solrRequest.getDeleteByIdMap());
map.put("deletes", solrRequest.getDeleteById());
map.put("deleteQuery", solrRequest.getDeleteQuery());
@@ -134,6 +134,7 @@ public class MirroredSolrRequestSerializer implements
Serializer<MirroredSolrReq
return baos.byteArray();
} catch (IOException e) {
+ log.error("Error in serialize", e);
throw new RuntimeException(e);
}
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 08749b0..e4bc732 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -22,7 +22,6 @@ import org.apache.solr.crossdc.common.ConfigProperty;
import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.SensitivePropRedactionUtils;
-import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.eclipse.jetty.server.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,8 +31,10 @@ import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
@@ -47,6 +48,8 @@ public class Consumer {
private Server server;
private CrossDcConsumer crossDcConsumer;
+ private CountDownLatch startLatch = new CountDownLatch(1);
+
public void start() {
start(new HashMap<>());
@@ -104,25 +107,29 @@ public class Consumer {
//connector.setPort(port);
//server.setConnectors(new Connector[] {connector})
KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
- crossDcConsumer = getCrossDcConsumer(conf);
+ crossDcConsumer = getCrossDcConsumer(conf, startLatch);
// Start consumer thread
log.info("Starting CrossDC Consumer {}", conf);
- /**
- * ExecutorService to manage the cross-dc consumer threads.
- */
ExecutorService consumerThreadExecutor =
Executors.newSingleThreadExecutor();
consumerThreadExecutor.submit(crossDcConsumer);
// Register shutdown hook
Thread shutdownHook = new Thread(() -> System.out.println("Shutting
down consumers!"));
Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ try {
+ startLatch.await(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new
SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
+ }
}
- private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
- return new KafkaCrossDcConsumer(conf);
+ private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf,
CountDownLatch startLatch) {
+ return new KafkaCrossDcConsumer(conf, startLatch);
}
public static void main(String[] args) {
@@ -141,7 +148,6 @@ public class Consumer {
* Abstract class for defining cross-dc consumer
*/
public abstract static class CrossDcConsumer implements Runnable {
- SolrMessageProcessor messageProcessor;
abstract void shutdown();
}
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 451253b..9ac506a 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -8,7 +8,11 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
import org.apache.solr.crossdc.common.*;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
import org.slf4j.Logger;
@@ -16,35 +20,59 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
+import java.util.*;
+import java.util.concurrent.*;
/**
- * Class to run the consumer thread for Kafka. This also contains the
implementation for retries and
- * resubmitting to the queue in case of temporary failures.
+ * This is a Java class called KafkaCrossDcConsumer, which is part of the
Apache Solr framework.
+ * It consumes messages from Kafka and mirrors them into a Solr instance. It
uses a KafkaConsumer
+ * object to subscribe to one or more topics and receive ConsumerRecords that
contain MirroredSolrRequest
+ * objects. The SolrMessageProcessor handles each MirroredSolrRequest and
sends the resulting
+ * UpdateRequest to the CloudSolrClient for indexing. A ThreadPoolExecutor is
used to handle the update
+ * requests asynchronously. The KafkaCrossDcConsumer also handles offset
management, committing offsets
+ * to Kafka and can seek to specific offsets for error recovery. The class
provides methods to start and
+ * top the consumer thread.
*/
public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final MetricRegistry metrics =
SharedMetricRegistries.getOrCreate("metrics");
- private final KafkaConsumer<String, MirroredSolrRequest> consumer;
- private final KafkaMirroringSink kafkaMirroringSink;
+ private final KafkaConsumer<String,MirroredSolrRequest> consumer;
+ private final CountDownLatch startLatch;
+ KafkaMirroringSink kafkaMirroringSink;
private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
- private final String topicName;
+ private final String[] topicNames;
private final SolrMessageProcessor messageProcessor;
private final CloudSolrClient solrClient;
+ private final ArrayBlockingQueue<Runnable> queue = new
ArrayBlockingQueue<>(10) {
+ @Override public boolean offer(Runnable r) {
+ //return super.offer(r);
+ try {
+ super.put(r);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ return true;
+ }
+
+ };
+ private final ThreadPoolExecutor executor;
+
+ private final ConcurrentHashMap<TopicPartition,PartitionWork>
partitionWorkMap = new ConcurrentHashMap<>();
+
/**
- * @param conf The Kafka consumer configuration
+ * @param conf The Kafka consumer configuration
+ * @param startLatch
*/
- public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
- this.topicName = conf.get(KafkaCrossDcConf.TOPIC_NAME);
+ public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch
startLatch) {
+ this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
+ this.startLatch = startLatch;
final Properties kafkaConsumerProps = new Properties();
kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
@@ -66,10 +94,17 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
kafkaConsumerProps.putAll(conf.getAdditionalProperties());
+ int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
+ executor = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
+ @Override public Thread newThread(Runnable r) {
+ Thread t = new Thread(r);
+ t.setName("KafkaCrossDcConsumerWorker");
+ return t;
+ }
+ });
+ executor.prestartAllCoreThreads();
- solrClient =
- new
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
- Optional.empty()).build();
+ solrClient = createSolrClient(conf);
messageProcessor = new SolrMessageProcessor(solrClient, resubmitRequest ->
0L);
@@ -78,14 +113,13 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
- this.kafkaMirroringSink = new KafkaMirroringSink(conf);
+ this.kafkaMirroringSink = createKafkaMirroringSink(conf);
log.info("Created Kafka resubmit producer");
}
- public static KafkaConsumer<String, MirroredSolrRequest>
createConsumer(Properties properties) {
- return new KafkaConsumer<>(properties, new StringDeserializer(),
- new MirroredSolrRequestSerializer());
+ public KafkaConsumer<String,MirroredSolrRequest> createConsumer(Properties
properties) {
+ return new KafkaConsumer<>(properties, new StringDeserializer(), new
MirroredSolrRequestSerializer());
}
/**
@@ -95,11 +129,14 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
* 3. Send the request to the MirroredSolrRequestHandler that has the
processing, retry, error handling logic.
*/
@Override public void run() {
- log.info("About to start Kafka consumer thread, topic={}", topicName);
+ log.info("About to start Kafka consumer thread, topics={}",
Arrays.asList(topicNames));
try {
- consumer.subscribe(Collections.singleton(topicName));
+ consumer.subscribe(Arrays.asList((topicNames)));
+
+ log.info("Consumer started");
+ startLatch.countDown();
while (pollAndProcessRequests()) {
//no-op within this loop: everything is done in pollAndProcessRequests
method defined above.
@@ -130,81 +167,113 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
boolean pollAndProcessRequests() {
log.trace("Entered pollAndProcessRequests loop");
try {
- ConsumerRecords<String, MirroredSolrRequest> records =
- consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+ for (TopicPartition partition : partitionWorkMap.keySet()) {
+ checkForOffsetUpdates(partition);
+ }
+
+ ConsumerRecords<String,MirroredSolrRequest> records =
consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+
+ if (log.isTraceEnabled()) {
+ log.trace("poll return {} records", records.count());
+ }
+
+ UpdateRequest solrReqBatch = null;
+
+ ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
+
for (TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords =
- records.records(partition);
+ List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords =
records.records(partition);
+
+ PartitionWork partitionWork = partitionWorkMap.compute(partition, (k,
v) -> {
+ if (v == null) {
+ return new PartitionWork();
+ }
+ return v;
+ });
+ WorkUnit workUnit = new WorkUnit();
+ workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+ partitionWork.partitionQueue.add(workUnit);
try {
- for (ConsumerRecord<String, MirroredSolrRequest> record :
partitionRecords) {
+ ModifiableSolrParams lastParams = null;
+ NamedList lastParamsAsNamedList = null;
+ solrReqBatch = new UpdateRequest();
+ for (ConsumerRecord<String,MirroredSolrRequest> requestRecord :
partitionRecords) {
if (log.isTraceEnabled()) {
- log.trace("Fetched record from topic={} partition={} key={}
value={}", record.topic(),
- record.partition(), record.key(), record.value());
+ log.trace("Fetched record from topic={} partition={} key={}
value={}", requestRecord.topic(), requestRecord.partition(),
requestRecord.key(),
+ requestRecord.value());
}
- IQueueHandler.Result<MirroredSolrRequest> result =
messageProcessor.handleItem(record.value());
- switch (result.status()) {
- case FAILED_RESUBMIT:
- // currently, we use a strategy taken from an earlier working
implementation
- // of just resubmitting back to the queue - note that in rare
cases, this could
- // allow for incorrect update reorders
- if (log.isTraceEnabled()) {
- log.trace("result=failed-resubmit");
- }
- metrics.counter("failed-resubmit").inc();
- kafkaMirroringSink.submit(record.value());
- break;
- case HANDLED:
- // no-op
- if (log.isTraceEnabled()) {
- log.trace("result=handled");
- }
- metrics.counter("handled").inc();
- break;
- case NOT_HANDLED_SHUTDOWN:
- if (log.isTraceEnabled()) {
- log.trace("result=nothandled_shutdown");
- }
- metrics.counter("nothandled_shutdown").inc();
- case FAILED_RETRY:
- log.error("Unexpected response while processing request. We
never expect {}.",
- result.status().toString());
- metrics.counter("failed-retry").inc();
- break;
- default:
- if (log.isTraceEnabled()) {
- log.trace("result=no matching case");
- }
- // no-op
+
+ lastRecord = requestRecord;
+ MirroredSolrRequest req = requestRecord.value();
+ UpdateRequest solrReq = (UpdateRequest) req.getSolrRequest();
+ ModifiableSolrParams params = solrReq.getParams();
+ if (log.isTraceEnabled()) {
+ log.trace("params={}", params);
+ }
+
+ if (lastParams != null &&
!lastParams.toNamedList().equals(params.toNamedList())) {
+ if (log.isTraceEnabled()) {
+ log.trace("SolrParams have changed, starting new
UpdateRequest, params={}", params);
+ }
+ lastParamsAsNamedList = null;
+ sendBatch(solrReqBatch, lastRecord, workUnit);
+ solrReqBatch = new UpdateRequest();
+ workUnit = new WorkUnit();
+ workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+ partitionWork.partitionQueue.add(workUnit);
+ }
+
+ lastParams = solrReq.getParams();
+ solrReqBatch.setParams(params);
+ if (lastParamsAsNamedList == null) {
+ lastParamsAsNamedList = lastParams.toNamedList();
+ }
+
+ List<SolrInputDocument> docs = solrReq.getDocuments();
+ if (docs != null) {
+ solrReqBatch.add(docs);
+ }
+ List<String> deletes = solrReq.getDeleteById();
+ if (deletes != null) {
+ solrReqBatch.deleteById(deletes);
+ }
+ List<String> deleteByQuery = solrReq.getDeleteQuery();
+ if (deleteByQuery != null) {
+ for (String delByQuery : deleteByQuery) {
+ solrReqBatch.deleteByQuery(delByQuery);
+ }
}
+
}
- updateOffset(partition, partitionRecords);
+
+ sendBatch(solrReqBatch, lastRecord, workUnit);
+
+ checkForOffsetUpdates(partition);
// handleItem sets the thread interrupt, let's exit if there has
been an interrupt set
if (Thread.currentThread().isInterrupted()) {
log.info("Kafka Consumer thread interrupted, shutting down Kafka
consumer.");
return false;
}
- } catch (MirroringException e) {
- // We don't really know what to do here, so it's wiser to just break
out.
- log.error(
- "Mirroring exception occurred while resubmitting to Kafka. We
are going to stop the consumer thread now.",
- e);
- return false;
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down
KafkaSolrRequestConsumer.");
return false;
} catch (Exception e) {
- // If there is any exception returned by handleItem, then reset the
offset.
+ // If there is any exception returned by handleItem, don't set the
offset.
if (e instanceof ClassCastException || e instanceof
SerializationException) { // TODO: optional
log.error("Non retryable error", e);
break;
}
- log.warn("Exception occurred in Kafka consumer thread, but we will
continue.", e);
- resetOffsetForPartition(partition, partitionRecords);
+ log.error("Exception occurred in Kafka consumer thread, stopping the
Consumer.", e);
break;
}
}
+
+ for (TopicPartition partition : partitionWorkMap.keySet()) {
+ checkForOffsetUpdates(partition);
+ }
+
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down
KafkaSolrRequestConsumer");
return false;
@@ -220,14 +289,94 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
return true;
}
+ public void sendBatch(UpdateRequest solrReqBatch,
ConsumerRecord<String,MirroredSolrRequest> lastRecord, WorkUnit workUnit) {
+ UpdateRequest finalSolrReqBatch = solrReqBatch;
+ Future<?> future = executor.submit(() -> {
+ IQueueHandler.Result<MirroredSolrRequest> result =
messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
+ try {
+ processResult(lastRecord, result);
+ } catch (MirroringException e) {
+ // We don't really know what to do here
+ log.error("Mirroring exception occurred while resubmitting to Kafka.
We are going to stop the consumer thread now.", e);
+ throw new RuntimeException(e);
+ }
+ });
+ workUnit.workItems.add(future);
+ }
+
+ private void checkForOffsetUpdates(TopicPartition partition) {
+ PartitionWork work;
+ while ((work = partitionWorkMap.get(partition)) != null) {
+ WorkUnit workUnit = work.partitionQueue.peek();
+ if (workUnit != null) {
+ for (Future<?> future : workUnit.workItems) {
+ if (!future.isDone()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Future for update is not done topic={}",
partition.topic());
+ }
+ return;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Future for update is done topic={}", partition.topic());
+ }
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ log.error("Exception resubmitting updates to Kafka, stopping the
Consumer thread", e);
+ work.partitionQueue.poll();
+ throw new RuntimeException("Exception resubmitting updates to
Kafka, stopping the Consumer thread", e);
+ }
+ work.partitionQueue.poll();
+ }
+
+ updateOffset(partition, workUnit.nextOffset);
+
+ }
+ }
+ }
+
+ void processResult(ConsumerRecord<String,MirroredSolrRequest> record,
IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
+ switch (result.status()) {
+ case FAILED_RESUBMIT:
+ if (log.isTraceEnabled()) {
+ log.trace("result=failed-resubmit");
+ }
+ metrics.counter("failed-resubmit").inc();
+ kafkaMirroringSink.submit(record.value());
+ break;
+ case HANDLED:
+ // no-op
+ if (log.isTraceEnabled()) {
+ log.trace("result=handled");
+ }
+ metrics.counter("handled").inc();
+ break;
+ case NOT_HANDLED_SHUTDOWN:
+ if (log.isTraceEnabled()) {
+ log.trace("result=nothandled_shutdown");
+ }
+ metrics.counter("nothandled_shutdown").inc();
+ case FAILED_RETRY:
+ log.error("Unexpected response while processing request. We never
expect {}.", result.status().toString());
+ metrics.counter("failed-retry").inc();
+ break;
+ default:
+ if (log.isTraceEnabled()) {
+ log.trace("result=no matching case");
+ }
+ // no-op
+ }
+ }
+
/**
* Reset the local offset so that the consumer reads the records from Kafka
again.
*
* @param partition The TopicPartition to reset the offset for
* @param partitionRecords PartitionRecords for the specified partition
*/
- private void resetOffsetForPartition(TopicPartition partition,
- List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+ private void resetOffsetForPartition(TopicPartition partition,
List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords) {
if (log.isTraceEnabled()) {
log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
}
@@ -238,32 +387,56 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
/**
* Logs and updates the commit point for the partition that has been
processed.
*
- * @param partition The TopicPartition to update the offset for
- * @param partitionRecords PartitionRecords for the specified partition
+ * @param partition The TopicPartition to update the offset for
+ * @param nextOffset The next offset to commit for this partition.
*/
- private void updateOffset(TopicPartition partition,
- List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
- long nextOffset = partitionRecords.get(partitionRecords.size() -
1).offset() + 1;
-
+ private void updateOffset(TopicPartition partition, long nextOffset) {
if (log.isTraceEnabled()) {
- log.trace("Updated offset for topic={} partition={} to offset={}",
partition.topic(),
- partition.partition(), nextOffset);
+ log.trace("Updated offset for topic={} partition={} to offset={}",
partition.topic(), partition.partition(), nextOffset);
}
consumer.commitSync(Collections.singletonMap(partition, new
OffsetAndMetadata(nextOffset)));
}
+ private static long
getOffsetForPartition(List<ConsumerRecord<String,MirroredSolrRequest>>
partitionRecords) {
+ long nextOffset = partitionRecords.get(partitionRecords.size() -
1).offset() + 1;
+ return nextOffset;
+ }
+
/**
* Shutdown the Kafka consumer by calling wakeup.
*/
public final void shutdown() {
+ consumer.wakeup();
log.info("Shutdown called on KafkaCrossDcConsumer");
try {
+ if (!executor.isShutdown()) {
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+ }
solrClient.close();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted while waiting for executor to shutdown");
} catch (Exception e) {
- log.warn("Exception closing Solr client on shutdown");
+ log.warn("Exception closing Solr client on shutdown", e);
}
- consumer.wakeup();
}
+ protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
+ return new
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
Optional.empty()).build();
+ }
+
+ protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf)
{
+ return new KafkaMirroringSink(conf);
+ }
+
+ private static class PartitionWork {
+ private final Queue<WorkUnit> partitionQueue = new LinkedList<>();
+ }
+
+ static class WorkUnit {
+ Set<Future<?>> workItems = new HashSet<>();
+ long nextOffset;
+ }
}
diff --git
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
new file mode 100644
index 0000000..9ff2a8e
--- /dev/null
+++
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -0,0 +1,151 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class KafkaCrossDcConsumerTest {
+
+ private KafkaCrossDcConsumer kafkaCrossDcConsumer;
+ private KafkaConsumer<String,MirroredSolrRequest> kafkaConsumerMock;
+ private CloudSolrClient solrClientMock;
+ private KafkaMirroringSink kafkaMirroringSinkMock;
+
+ private SolrMessageProcessor messageProcessorMock;
+
+ @Before public void setUp() {
+ kafkaConsumerMock = mock(KafkaConsumer.class);
+ solrClientMock = mock(CloudSolrClient.class);
+ kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
+ messageProcessorMock = mock(SolrMessageProcessor.class);
+ KafkaCrossDcConf conf = testCrossDCConf();
+ // Set necessary configurations
+
+ kafkaCrossDcConsumer = new KafkaCrossDcConsumer(conf, new
CountDownLatch(0)) {
+ @Override public KafkaConsumer<String,MirroredSolrRequest>
createConsumer(Properties properties) {
+ return kafkaConsumerMock;
+ }
+
+ @Override protected CloudSolrClient createSolrClient(KafkaCrossDcConf
conf) {
+ return solrClientMock;
+ }
+
+ @Override protected KafkaMirroringSink
createKafkaMirroringSink(KafkaCrossDcConf conf) {
+ return kafkaMirroringSinkMock;
+ }
+ };
+ }
+
+ private static KafkaCrossDcConf testCrossDCConf() {
+ Map config = new HashMap<>();
+ config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
+ config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(config);
+ return conf;
+ }
+
+ @After public void tearDown() {
+ kafkaCrossDcConsumer.shutdown();
+ }
+
+ @Test public void testRunAndShutdown() throws Exception {
+ // Define the expected behavior of the mocks and set up the test scenario
+ when(kafkaConsumerMock.poll(any())).thenReturn(new
ConsumerRecords<>(Collections.emptyMap()));
+
+ ExecutorService consumerThreadExecutor =
Executors.newSingleThreadExecutor();
+
+ // Run the test
+ consumerThreadExecutor.submit(kafkaCrossDcConsumer);
+
+ // Run the shutdown method
+ kafkaCrossDcConsumer.shutdown();
+
+ // Verify that the consumer was subscribed with the correct topic names
+ verify(kafkaConsumerMock).subscribe(anyList());
+
+ // Verify that the appropriate methods were called on the mocks
+ verify(kafkaConsumerMock).wakeup();
+ verify(solrClientMock).close();
+
+ consumerThreadExecutor.shutdown();
+ consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ @Test public void testHandleFailedResubmit() throws Exception {
+ // Set up the KafkaCrossDcConsumer
+ KafkaCrossDcConf testConf = testCrossDCConf();
+ KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf, new
CountDownLatch(0)));
+ doNothing().when(consumer).sendBatch(any(UpdateRequest.class),
any(ConsumerRecord.class), any(KafkaCrossDcConsumer.WorkUnit.class));
+
+ // Set up the SolrMessageProcessor mock
+ SolrMessageProcessor mockMessageProcessor =
mock(SolrMessageProcessor.class);
+ IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new
IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
+
when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
+
+ // Mock the KafkaMirroringSink
+ KafkaMirroringSink mockKafkaMirroringSink = mock(KafkaMirroringSink.class);
+
doNothing().when(mockKafkaMirroringSink).submit(any(MirroredSolrRequest.class));
+ consumer.kafkaMirroringSink = mockKafkaMirroringSink;
+
+ // Call the method to test
+ ConsumerRecord<String,MirroredSolrRequest> record =
createSampleConsumerRecord();
+ consumer.processResult(record, failedResubmitResult);
+
+ // Verify that the KafkaMirroringSink.submit() method was called
+ verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
+ }
+
+ private ConsumerRecord<String,MirroredSolrRequest>
createSampleConsumerRecord() {
+ return new ConsumerRecord<>("sample-topic", 0, 0, "key",
createSampleMirroredSolrRequest());
+ }
+
+ private ConsumerRecords<String,MirroredSolrRequest>
createSampleConsumerRecords() {
+ TopicPartition topicPartition = new TopicPartition("sample-topic", 0);
+ List<ConsumerRecord<String,MirroredSolrRequest>> recordsList = new
ArrayList<>();
+ recordsList.add(new ConsumerRecord<>("sample-topic", 0, 0, "key",
createSampleMirroredSolrRequest()));
+ return new ConsumerRecords<>(Collections.singletonMap(topicPartition,
recordsList));
+ }
+
+ private MirroredSolrRequest createSampleMirroredSolrRequest() {
+ // Create a sample MirroredSolrRequest for testing
+ SolrInputDocument solrInputDocument = new SolrInputDocument();
+ solrInputDocument.addField("id", "1");
+ solrInputDocument.addField("title", "Sample title");
+ solrInputDocument.addField("content", "Sample content");
+ UpdateRequest updateRequest = new UpdateRequest();
+ updateRequest.add(solrInputDocument);
+ return new MirroredSolrRequest(updateRequest);
+ }
+}
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 4a989e4..9a6e5cd 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -36,15 +36,14 @@ sourceSets {
dependencies {
implementation project(':crossdc-consumer')
implementation project(path: ':crossdc-commons', configuration: 'shadow')
+ testImplementation project(path: ':crossdc-commons')
provided group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
testImplementation 'org.slf4j:slf4j-api'
testImplementation 'org.hamcrest:hamcrest:2.2'
testImplementation 'junit:junit:4.13.2'
- testImplementation('org.mockito:mockito-core:4.3.1', {
- exclude group: "net.bytebuddy", module: "byte-buddy-agent"
- })
+ testImplementation('org.mockito:mockito-inline:4.3.1')
testImplementation group: 'org.apache.solr', name: 'solr-core', version:
'8.11.2'
testImplementation group: 'org.apache.solr', name: 'solr-test-framework',
version: '8.11.2'
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 74ee1eb..dc2e561 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -29,10 +29,10 @@ import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
+// review this code for bugs and performance improvements
public class MirroringUpdateProcessor extends UpdateRequestProcessor {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -42,14 +42,12 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
* necessary to prevent circular mirroring between coupled cluster running
this processor.
*/
private final boolean doMirroring;
- private final RequestMirroringHandler requestMirroringHandler;
+ final RequestMirroringHandler requestMirroringHandler;
/**
* The mirrored request starts as null, gets created and appended to at each
process() call,
* then submitted on finish().
*/
- private UpdateRequest mirrorRequest;
- private long mirrorRequestBytes;
private final SolrParams mirrorParams;
@@ -57,7 +55,8 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
* Controls whether docs exceeding the max-size (and thus cannot be
mirrored) are indexed locally.
*/
private final boolean indexUnmirrorableDocs;
- private final long maxMirroringBatchSizeBytes;
+
+ private final long maxMirroringDocSizeBytes;
/**
@@ -79,7 +78,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
super(next);
this.doMirroring = doMirroring;
this.indexUnmirrorableDocs = indexUnmirrorableDocs;
- this.maxMirroringBatchSizeBytes = maxMirroringBatchSizeBytes;
+ this.maxMirroringDocSizeBytes = maxMirroringBatchSizeBytes;
this.mirrorParams = mirroredReqParams;
this.distribPhase = distribPhase;
this.requestMirroringHandler = requestMirroringHandler;
@@ -88,43 +87,47 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
}
- private UpdateRequest createAndOrGetMirrorRequest() {
- if (mirrorRequest == null) {
- mirrorRequest = new UpdateRequest();
+ UpdateRequest createMirrorRequest() {
+ UpdateRequest mirrorRequest = new UpdateRequest();
mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
- mirrorRequestBytes = 0L;
- }
- if (log.isDebugEnabled())
- log.debug("createOrGetMirrorRequest={}",
- mirrorRequest);
- return mirrorRequest;
+ return mirrorRequest;
}
@Override public void processAdd(final AddUpdateCommand cmd) throws
IOException {
-
+ UpdateRequest mirrorRequest = createMirrorRequest();
final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
- final boolean tooLargeForKafka = estimatedDocSizeInBytes >
maxMirroringBatchSizeBytes;
+ log.info("estimated doc size is {} bytes, max size is {}",
estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+ final boolean tooLargeForKafka = estimatedDocSizeInBytes >
maxMirroringDocSizeBytes;
if (tooLargeForKafka && !indexUnmirrorableDocs) {
- log.warn("Skipping indexing of doc {} as it exceeds the doc-size limit
({} bytes) and is unmirrorable.", cmd.getPrintableId(),
maxMirroringBatchSizeBytes);
- } else {
- super.processAdd(cmd); // let this throw to prevent mirroring invalid
reqs
+ throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update
exceeds the doc-size limit and is unmirrorable. id="
+
+ + cmd.getPrintableId() + " doc size=" + estimatedDocSizeInBytes + "
maxDocSize=" + maxMirroringDocSizeBytes);
+ } else if (tooLargeForKafka) {
+ log.warn(
+ "Skipping mirroring of doc {} as it exceeds the doc-size limit ({}
bytes) and is unmirrorable. doc size={}",
+ cmd.getPrintableId(), maxMirroringDocSizeBytes,
estimatedDocSizeInBytes);
}
+ super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+
// submit only from the leader shards so we mirror each doc once
boolean isLeader = isLeader(cmd.getReq(), cmd.getIndexedIdStr(), null,
cmd.getSolrInputDocument());
- if (doMirroring && isLeader) {
- if (tooLargeForKafka) {
- log.error("Skipping mirroring of doc {} because estimated size exceeds
batch size limit {} bytes", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
- } else {
- createAndOrGetMirrorRequest().add(doc, cmd.commitWithin,
cmd.overwrite);
- mirrorRequestBytes += estimatedDocSizeInBytes;
+ if (!tooLargeForKafka && doMirroring && isLeader) {
+
+ mirrorRequest.add(doc, cmd.commitWithin, cmd.overwrite);
+
+ try {
+ requestMirroringHandler.mirror(mirrorRequest);
+ } catch (Exception e) {
+ log.error("mirror submit failed", e);
+ throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
if (log.isDebugEnabled())
- log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
+ log.debug("processAdd isLeader={} doMirroring={} tooLargeForKafka={}
cmd={}", isLeader, doMirroring, tooLargeForKafka, cmd);
}
@Override public void processDelete(final DeleteUpdateCommand cmd) throws
IOException {
@@ -175,12 +178,21 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
if (doMirroring) {
boolean isLeader = false;
+ UpdateRequest mirrorRequest = createMirrorRequest();
if (cmd.isDeleteById()) {
// deleteById requests runs once per leader, so we just submit the
request from the leader shard
isLeader = isLeader(cmd.getReq(), ((DeleteUpdateCommand)cmd).getId(),
null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
ShardParams._ROUTE_), null);
if (isLeader) {
- createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip
versions from deletes
+
+ mirrorRequest.deleteById(cmd.getId()); // strip versions from deletes
+
+ try {
+ requestMirroringHandler.mirror(mirrorRequest);
+ } catch (Exception e) {
+ log.error("mirror submit failed", e);
+ throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+ }
}
if (log.isDebugEnabled())
log.debug("processDelete doMirroring={} isLeader={} cmd={}", true,
isLeader, cmd);
@@ -191,7 +203,14 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
// TODO: Can we actually support this considering DBQs aren't
versioned.
if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
- createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+ mirrorRequest.deleteByQuery(cmd.query);
+
+ try {
+ requestMirroringHandler.mirror(mirrorRequest);
+ } catch (Exception e) {
+ log.error("mirror submit failed", e);
+ throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+ }
}
if (log.isDebugEnabled())
log.debug("processDelete doMirroring={} cmd={}", true, cmd);
@@ -214,7 +233,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
}
}
- private boolean isLeader(SolrQueryRequest req, String id, String route,
SolrInputDocument doc) {
+ boolean isLeader(SolrQueryRequest req, String id, String route,
SolrInputDocument doc) {
CloudDescriptor cloudDesc =
req.getCore().getCoreDescriptor().getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
@@ -256,29 +275,6 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
@Override public final void finish() throws IOException {
super.finish();
-
- if (doMirroring && mirrorRequest != null) {
- // We are configured to mirror, but short-circuit on batches we already
know will fail (because they cumulatively
- // exceed the mirroring max-size)
- if (mirrorRequestBytes > maxMirroringBatchSizeBytes) {
- final String batchedIds = mirrorRequest.getDocuments().stream()
- .map(doc -> doc.getField("id").getValue().toString())
- .collect(Collectors.joining(", "));
- log.warn("Mirroring skipped for request because batch size {} bytes
exceeds limit {} bytes. IDs: {}",
- mirrorRequestBytes, maxMirroringBatchSizeBytes, batchedIds);
- mirrorRequest = null;
- mirrorRequestBytes = 0L;
- return;
- }
-
- try {
- requestMirroringHandler.mirror(mirrorRequest);
- mirrorRequest = null; // so we don't accidentally submit it again
- } catch (Exception e) {
- log.error("mirror submit failed", e);
- throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
- }
- }
}
// package private for testing
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 06f7dba..4f571cf 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -60,9 +60,6 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public static final NoOpUpdateRequestProcessor
NO_OP_UPDATE_REQUEST_PROCESSOR =
- new NoOpUpdateRequestProcessor();
-
// Flag for mirroring requests
public static final String SERVER_SHOULD_MIRROR = "shouldMirror";
@@ -71,7 +68,7 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
private boolean enabled = true;
- private boolean indexUnmirrorableDocs = false;
+
private KafkaCrossDcConf conf;
private final Map<String,Object> properties = new HashMap<>();
@@ -85,11 +82,6 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
this.enabled = false;
}
- final Boolean indexUnmirrorableDocsArg =
args.getBooleanArg("indexUnmirrorableDocs");
- if (indexUnmirrorableDocsArg != null && indexUnmirrorableDocsArg) {
- this.indexUnmirrorableDocs = true;
- }
-
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
String val = args._getStr(configKey.getKey(), null);
if (val != null && !val.isBlank()) {
@@ -210,7 +202,7 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
final UpdateRequestProcessor
next) {
if (!enabled) {
- return NO_OP_UPDATE_REQUEST_PROCESSOR;
+ return new NoOpUpdateRequestProcessor(next);
}
// if the class fails to initialize
@@ -221,6 +213,7 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
// Check if mirroring is disabled in request params, defaults to true
boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR,
true);
final long maxMirroringBatchSizeBytes =
conf.getInt(MAX_REQUEST_SIZE_BYTES);
+ Boolean indexUnmirrorableDocs = conf.getBool(INDEX_UNMIRRORABLE_DOCS);
ModifiableSolrParams mirroredParams = null;
if (doMirroring) {
@@ -256,8 +249,8 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
}
private static class NoOpUpdateRequestProcessor extends
UpdateRequestProcessor {
- NoOpUpdateRequestProcessor() {
- super(null);
+ NoOpUpdateRequestProcessor(UpdateRequestProcessor next) {
+ super(next);
}
}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 7cba50a..ebdd879 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -14,6 +14,7 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BaseCloudSolrClient;
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -27,10 +28,7 @@ import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.consumer.Consumer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.sys.Prop;
@@ -44,6 +42,7 @@ import java.util.Map;
import java.util.Properties;
import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
import static org.mockito.Mockito.spy;
@ThreadLeakFilters(defaultFilters = true, filters = {
SolrIgnoredThreadsFilter.class,
@@ -53,8 +52,7 @@ import static org.mockito.Mockito.spy;
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private static final int MAX_MIRROR_BATCH_SIZE_BYTES =
Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE);
- private static final int MAX_DOC_SIZE_BYTES = MAX_MIRROR_BATCH_SIZE_BYTES;
+ private static final int MAX_DOC_SIZE_BYTES =
Integer.parseInt(DEFAULT_MAX_REQUEST_SIZE);
static final String VERSION_FIELD = "_version_";
@@ -64,19 +62,24 @@ import static org.mockito.Mockito.spy;
protected static volatile MiniSolrCloudCluster solrCluster1;
protected static volatile MiniSolrCloudCluster solrCluster2;
- protected static volatile Consumer consumer = new Consumer();
+ protected static volatile Consumer consumer;
private static String TOPIC = "topic1";
private static String COLLECTION = "collection1";
private static String ALT_COLLECTION = "collection2";
+ private static Thread.UncaughtExceptionHandler uceh;
@BeforeClass
public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
-
+ uceh = Thread.getDefaultUncaughtExceptionHandler();
+ Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+ log.error("Uncaught exception in thread " + t, e);
+ });
+ consumer = new Consumer();
Properties config = new Properties();
- config.put("unclean.leader.election.enable", "true");
- config.put("enable.partition.eof", "false");
+ //config.put("unclean.leader.election.enable", "true");
+ //config.put("enable.partition.eof", "false");
kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
public String bootstrapServers() {
@@ -89,6 +92,7 @@ import static org.mockito.Mockito.spy;
System.setProperty("topicName", TOPIC);
System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+ System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
solrCluster1 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
@@ -127,14 +131,6 @@ import static org.mockito.Mockito.spy;
public static void afterSolrAndKafkaIntegrationTest() throws Exception {
ObjectReleaseTracker.clear();
- consumer.shutdown();
-
- try {
- kafkaCluster.stop();
- } catch (Exception e) {
- log.error("Exception stopping Kafka cluster", e);
- }
-
if (solrCluster1 != null) {
solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
solrCluster1.shutdown();
@@ -143,21 +139,19 @@ import static org.mockito.Mockito.spy;
solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
solrCluster2.shutdown();
}
- }
- @After
- public void tearDown() throws Exception {
- super.tearDown();
- solrCluster1.getSolrClient().deleteByQuery(COLLECTION, "*:*");
- solrCluster2.getSolrClient().deleteByQuery(COLLECTION, "*:*");
- solrCluster1.getSolrClient().commit(COLLECTION);
- solrCluster2.getSolrClient().commit(COLLECTION);
+ consumer.shutdown();
+ consumer = null;
- // Delete alternate collection in case it was created by any tests.
- if
(CollectionAdminRequest.listCollections(solrCluster1.getSolrClient()).contains(ALT_COLLECTION))
{
-
solrCluster1.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
-
solrCluster2.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
+ try {
+ kafkaCluster.deleteAllTopicsAndWait(5000);
+ kafkaCluster.stop();
+ kafkaCluster = null;
+ } catch (Exception e) {
+ log.error("Exception stopping Kafka cluster", e);
}
+
+ Thread.setDefaultUncaughtExceptionHandler(uceh);
}
public void testFullCloudToCloud() throws Exception {
@@ -175,11 +169,18 @@ import static org.mockito.Mockito.spy;
assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
}
+ private static SolrInputDocument getDoc() {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.nanoTime()));
+ doc.addField("text", "some test");
+ return doc;
+ }
+
public void testProducerToCloud() throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
properties.put("acks", "all");
- properties.put("retries", 0);
+ properties.put("retries", 1);
properties.put("batch.size", 1);
properties.put("buffer.memory", 33554432);
properties.put("linger.ms", 1);
@@ -210,19 +211,25 @@ import static org.mockito.Mockito.spy;
@Test
public void testMirroringUpdateProcessor() throws Exception {
final SolrInputDocument tooLargeDoc = new SolrInputDocument();
- tooLargeDoc.addField("id", "tooLarge-" +
String.valueOf(System.currentTimeMillis()));
+ tooLargeDoc.addField("id", System.nanoTime());
tooLargeDoc.addField("text", new String(new byte[2 * MAX_DOC_SIZE_BYTES]));
final SolrInputDocument normalDoc = new SolrInputDocument();
- normalDoc.addField("id", "normalDoc-" +
String.valueOf(System.currentTimeMillis()));
+ normalDoc.addField("id", System.nanoTime() + 22);
normalDoc.addField("text", "Hello world");
- final List<SolrInputDocument> docsToIndex = new ArrayList<>();
- docsToIndex.add(tooLargeDoc);
+ List<SolrInputDocument> docsToIndex = new ArrayList<>();
docsToIndex.add(normalDoc);
+ docsToIndex.add(tooLargeDoc);
final CloudSolrClient cluster1Client = solrCluster1.getSolrClient();
- cluster1Client.add(docsToIndex);
+ try {
+ cluster1Client.add(docsToIndex);
+ } catch (BaseCloudSolrClient.RouteException e) {
+ // expected
+ }
cluster1Client.commit(COLLECTION);
+
+
// Primary and secondary should each only index 'normalDoc'
final String normalDocQuery = "id:" + normalDoc.get("id").getFirstValue();
assertCluster2EventuallyHasDocs(COLLECTION, normalDocQuery, 1);
@@ -232,38 +239,42 @@ import static org.mockito.Mockito.spy;
// Create new primary+secondary collection where 'tooLarge' docs ARE
indexed on the primary
CollectionAdminRequest.Create create =
- CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1,
1)
- .withProperty("indexUnmirrorableDocs", "true");
- solrCluster1.getSolrClient().request(create);
- solrCluster2.getSolrClient().request(create);
- solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
- solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
-
- cluster1Client.add(ALT_COLLECTION, docsToIndex);
- cluster1Client.commit(ALT_COLLECTION);
-
- // Primary should have both 'normal' and 'large' docs; secondary should
only have 'normal' doc.
- assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 2);
- assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
- assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
-
- // Index batch of docs that will exceed the max mirroring batch size
cumulatively (but not individually)
- // Batch consists of 100 docs each roughly 1/100th of the max-batch-size
- docsToIndex.clear();
- for (int i = 0; i < 100; i++) {
- final SolrInputDocument doc = new SolrInputDocument();
- doc.addField("id", "cumulativelyTooLarge-" + System.currentTimeMillis()
+ "-" + i);
- doc.addField("cumulativelyTooLarge_b", "true");
- doc.addField("text", new String(new byte[MAX_MIRROR_BATCH_SIZE_BYTES /
100]));
- docsToIndex.add(doc);
+ CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1)
+ .withProperty("indexUnmirrorableDocs", "true");
+ try {
+ solrCluster1.getSolrClient().request(create);
+ solrCluster2.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+ solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+ cluster1Client.add(ALT_COLLECTION, docsToIndex);
+ cluster1Client.commit(ALT_COLLECTION);
+
+ // try adding another doc
+// final SolrInputDocument newDoc = new SolrInputDocument();
+//
+// newDoc.addField("id", System.nanoTime());
+// newDoc.addField("text", "Hello world");
+// docsToIndex = new ArrayList<>();
+// docsToIndex.add(newDoc);
+//
+// try {
+// cluster1Client.add(ALT_COLLECTION, docsToIndex);
+// } catch (BaseCloudSolrClient.RouteException e) {
+// // expected
+// }
+// cluster1Client.commit(ALT_COLLECTION);
+
+ // Primary should have both 'normal' and 'large' docs; secondary should
only have 'normal' doc.
+ assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 2);
+ assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
+ assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
+ } finally {
+ CollectionAdminRequest.Delete delete =
+ CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+ solrCluster1.getSolrClient().request(delete);
+ solrCluster2.getSolrClient().request(delete);
}
- cluster1Client.add(ALT_COLLECTION, docsToIndex);
- cluster1Client.commit(ALT_COLLECTION);
-
- final String cumulativelyTooLargeQuery = "cumulativelyTooLarge_b:true";
- // Primary (but not secondary) should have 100 additional docs
- assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION,
cumulativelyTooLargeQuery, 100);
- assertCluster2EventuallyHasDocs(ALT_COLLECTION, cumulativelyTooLargeQuery,
0);
}
private void assertCluster2EventuallyHasDocs(String collection, String
query, int expectedNumDocs) throws Exception {
@@ -283,13 +294,13 @@ import static org.mockito.Mockito.spy;
private void assertClusterEventuallyHasDocs(SolrClient client, String
collection, String query, int expectedNumDocs) throws Exception {
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 20; i++) {
client.commit(collection);
results = client.query(collection, new SolrQuery(query));
if (results.getResults().getNumFound() == expectedNumDocs) {
foundUpdates = true;
} else {
- Thread.sleep(100);
+ Thread.sleep(200);
}
}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
new file mode 100644
index 0000000..8027aae
--- /dev/null
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -0,0 +1,222 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
+
+@ThreadLeakFilters(defaultFilters = true, filters = {
SolrIgnoredThreadsFilter.class,
+ QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class
SolrAndKafkaMultiCollectionIntegrationTest extends
+ SolrTestCaseJ4 {
+
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int MAX_DOC_SIZE_BYTES =
Integer.parseInt(DEFAULT_MAX_REQUEST_SIZE);
+
+ static final String VERSION_FIELD = "_version_";
+
+ private static final int NUM_BROKERS = 1;
+ public EmbeddedKafkaCluster kafkaCluster;
+
+ protected volatile MiniSolrCloudCluster solrCluster1;
+ protected volatile MiniSolrCloudCluster solrCluster2;
+
+ protected static volatile Consumer consumer;
+
+ private static String TOPIC = "topic1";
+
+ private static String COLLECTION = "collection1";
+ private static String ALT_COLLECTION = "collection2";
+
+ @Before
+ public void beforeSolrAndKafkaIntegrationTest() throws Exception {
+ consumer = new Consumer();
+ Properties config = new Properties();
+ //config.put("unclean.leader.election.enable", "true");
+ //config.put("enable.partition.eof", "false");
+
+ kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+ public String bootstrapServers() {
+ return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+ }
+ };
+ kafkaCluster.start();
+
+ kafkaCluster.createTopic(TOPIC, 1, 1);
+
+ System.setProperty("topicName", TOPIC);
+ System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+ System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
+
+ solrCluster1 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
+
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+ solrCluster2 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
+
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+ CollectionAdminRequest.Create create2 =
+ CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+ solrCluster2.getSolrClient().request(create2);
+ solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+ solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+ String bootstrapServers = kafkaCluster.bootstrapServers();
+ log.info("bootstrapServers={}", bootstrapServers);
+
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+ properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES,
MAX_DOC_SIZE_BYTES);
+ consumer.start(properties);
+
+ }
+
+ @After
+ public void afterSolrAndKafkaIntegrationTest() throws Exception {
+ ObjectReleaseTracker.clear();
+
+ if (solrCluster1 != null) {
+ solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster1.shutdown();
+ }
+ if (solrCluster2 != null) {
+ solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster2.shutdown();
+ }
+
+ consumer.shutdown();
+ consumer = null;
+
+ try {
+ //kafkaCluster.deleteAllTopicsAndWait(10000);
+ kafkaCluster.stop();
+ kafkaCluster = null;
+ } catch (Exception e) {
+ log.error("Exception stopping Kafka cluster", e);
+ }
+
+
+ }
+
+ private static SolrInputDocument getDoc() {
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.nanoTime()));
+ doc.addField("text", "some test");
+ return doc;
+ }
+
+ private void assertCluster2EventuallyHasDocs(String collection, String
query, int expectedNumDocs) throws Exception {
+ assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection,
query, expectedNumDocs);
+ }
+
+ private void createCollection(CloudSolrClient client,
CollectionAdminRequest.Create createCmd) throws Exception {
+ final String stashedDefault = client.getDefaultCollection();
+ try {
+ //client.setDefaultCollection(null);
+ client.request(createCmd);
+ } finally {
+ //client.setDefaultCollection(stashedDefault);
+ }
+ }
+
+ @Test
+ public void testFullCloudToCloudMultiCollection() throws Exception {
+ CollectionAdminRequest.Create create =
+ CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1,
1);
+
+ try {
+ solrCluster1.getSolrClient().request(create);
+ solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+ solrCluster2.getSolrClient().request(create);
+ solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+
+ CloudSolrClient client = solrCluster1.getSolrClient();
+
+ SolrInputDocument doc1 = getDoc();
+ SolrInputDocument doc2 = getDoc();
+ SolrInputDocument doc3 = getDoc();
+ SolrInputDocument doc4 = getDoc();
+ SolrInputDocument doc5 = getDoc();
+ SolrInputDocument doc6 = getDoc();
+ SolrInputDocument doc7 = getDoc();
+
+ client.add(COLLECTION, doc1);
+ client.add(ALT_COLLECTION, doc2);
+ client.add(COLLECTION, doc3);
+ client.add(COLLECTION, doc4);
+ client.add(ALT_COLLECTION, doc5);
+ client.add(ALT_COLLECTION, doc6);
+ client.add(COLLECTION, doc7);
+
+ client.commit(COLLECTION);
+ client.commit(ALT_COLLECTION);
+
+ System.out.println("Sent producer record");
+
+ assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 3);
+ assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 4);
+
+ } finally {
+ CollectionAdminRequest.Delete delete =
+ CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+ solrCluster1.getSolrClient().request(delete);
+ solrCluster2.getSolrClient().request(delete);
+ }
+ }
+
+
+ private void assertClusterEventuallyHasDocs(SolrClient client, String
collection, String query, int expectedNumDocs) throws Exception {
+ QueryResponse results = null;
+ boolean foundUpdates = false;
+ for (int i = 0; i < 100; i++) {
+ client.commit(collection);
+ results = client.query(collection, new SolrQuery(query));
+ if (results.getResults().getNumFound() == expectedNumDocs) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(200);
+ }
+ }
+
+ assertTrue("results=" + results, foundUpdates);
+ }
+}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index e25ac83..927c5ab 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -105,6 +105,15 @@ import java.util.*;
public static void afterSolrAndKafkaIntegrationTest() throws Exception {
ObjectReleaseTracker.clear();
+ if (solrCluster1 != null) {
+ solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster1.shutdown();
+ }
+ if (solrCluster2 != null) {
+ solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster2.shutdown();
+ }
+
consumer.shutdown();
try {
@@ -115,14 +124,6 @@ import java.util.*;
log.error("Exception stopping Kafka cluster", e);
}
- if (solrCluster1 != null) {
- solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
- solrCluster1.shutdown();
- }
- if (solrCluster2 != null) {
- solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
- solrCluster2.shutdown();
- }
}
@After
@@ -141,7 +142,7 @@ import java.util.*;
QueryResponse results = null;
boolean foundUpdates = false;
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("*:*"));
@@ -160,17 +161,18 @@ import java.util.*;
assertEquals("results=" + results1, 7,
results1.getResults().getNumFound());
assertEquals("results=" + results2, 7,
results2.getResults().getNumFound());
+ System.out.println("adding second docs");
addDocs(client, "second");
foundUpdates = false;
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
- solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
- results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("*:*"));
+ solrCluster1.getSolrClient().query(COLLECTION, new
SolrQuery("text:second"));
+ results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("text:second"));
if (results.getResults().getNumFound() == 7) {
foundUpdates = true;
} else {
- Thread.sleep(100);
+ Thread.sleep(200);
}
}
@@ -179,8 +181,8 @@ import java.util.*;
assertTrue("results=" + results, foundUpdates);
System.out.println("Rest: " + results);
- results1 =solrCluster1.getSolrClient().query(COLLECTION, new
SolrQuery("second"));
- results2 = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("second"));
+ results1 =solrCluster1.getSolrClient().query(COLLECTION, new
SolrQuery("text:second"));
+ results2 = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("text:second"));
assertEquals("results=" + results1, 7,
results1.getResults().getNumFound());
assertEquals("results=" + results2, 7,
results2.getResults().getNumFound());
@@ -188,10 +190,10 @@ import java.util.*;
addDocs(client, "third");
foundUpdates = false;
- for (int i = 0; i < 1000; i++) {
+ for (int i = 0; i < 100; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
- solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
- results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("*:*"));
+ solrCluster1.getSolrClient().query(COLLECTION, new
SolrQuery("text:third"));
+ results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("text:third"));
if (results.getResults().getNumFound() == 7) {
foundUpdates = true;
} else {
@@ -204,8 +206,8 @@ import java.util.*;
assertTrue("results=" + results, foundUpdates);
System.out.println("Rest: " + results);
- results1 =solrCluster1.getSolrClient().query(COLLECTION, new
SolrQuery("third"));
- results2 = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("third"));
+ results1 =solrCluster1.getSolrClient().query(COLLECTION, new
SolrQuery("text:third"));
+ results2 = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("text:third"));
assertEquals("results=" + results1, 7,
results1.getResults().getNumFound());
assertEquals("results=" + results2, 7,
results2.getResults().getNumFound());
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index f6c8844..991f67e 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -135,6 +135,15 @@ import java.util.Properties;
public static void afterSolrAndKafkaIntegrationTest() throws Exception {
ObjectReleaseTracker.clear();
+ if (solrCluster1 != null) {
+ solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster1.shutdown();
+ }
+ if (solrCluster2 != null) {
+ solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+ solrCluster2.shutdown();
+ }
+
consumer1.shutdown();
consumer2.shutdown();
@@ -144,14 +153,6 @@ import java.util.Properties;
log.error("Exception stopping Kafka cluster", e);
}
- if (solrCluster1 != null) {
- solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
- solrCluster1.shutdown();
- }
- if (solrCluster2 != null) {
- solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
- solrCluster2.shutdown();
- }
}
@After
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
new file mode 100644
index 0000000..d21e410
--- /dev/null
+++
b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -0,0 +1,177 @@
+package org.apache.solr.update.processor;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequestBase;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.MirroringUpdateProcessor;
+import org.apache.solr.update.processor.RequestMirroringHandler;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
+
+ private UpdateRequestProcessor next;
+ private MirroringUpdateProcessor processor;
+ private RequestMirroringHandler requestMirroringHandler;
+ private AddUpdateCommand addUpdateCommand;
+ private DeleteUpdateCommand deleteUpdateCommand;
+ private SolrQueryRequestBase req;
+ UpdateRequest requestMock;
+ private UpdateRequestProcessor nextProcessor;
+ private SolrCore core;
+ private HttpSolrClient.Builder builder =
Mockito.mock(HttpSolrClient.Builder.class);
+ private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
+ private CloudDescriptor cloudDesc;
+
+ @Before public void setUp() throws Exception {
+ super.setUp();
+ addUpdateCommand = new AddUpdateCommand(req);
+ addUpdateCommand.solrDoc = new SolrInputDocument();
+ addUpdateCommand.solrDoc.addField("id", "test");
+ req = Mockito.mock(SolrQueryRequestBase.class);
+ Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
+
+ requestMock = Mockito.mock(UpdateRequest.class);
+ addUpdateCommand.setReq(req);
+
+ nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
+
+ IndexSchema schema = Mockito.mock(IndexSchema.class);
+ Mockito.when(req.getSchema()).thenReturn(schema);
+
+ deleteUpdateCommand = new DeleteUpdateCommand(req);
+ deleteUpdateCommand.query = "*:*";
+
+ next = Mockito.mock(UpdateRequestProcessor.class);
+ requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
+ processor = new MirroringUpdateProcessor(next, true, true, 1000L, new
ModifiableSolrParams(), DistributedUpdateProcessor.DistribPhase.NONE,
+ requestMirroringHandler) {
+ UpdateRequest createMirrorRequest() {
+ return requestMock;
+ }
+ };
+
+ core = Mockito.mock(SolrCore.class);
+ CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
+ cloudDesc = Mockito.mock(CloudDescriptor.class);
+ CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
+ ZkController zkController = Mockito.mock(ZkController.class);
+ ClusterState clusterState = Mockito.mock(ClusterState.class);
+ DocCollection docCollection = Mockito.mock(DocCollection.class);
+ DocRouter docRouter = Mockito.mock(DocRouter.class);
+ Slice slice = Mockito.mock(Slice.class);
+ ZkStateReader zkStateReader = Mockito.mock(ZkStateReader.class);
+ Replica replica = Mockito.mock(Replica.class);
+
+ Mockito.when(replica.getName()).thenReturn("replica1");
+ Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(),
Mockito.any())).thenReturn(replica);
+ Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
+ Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
+
Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
+ Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
+ Mockito.when(docRouter.getTargetSlice(Mockito.any(), Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(slice);
+ Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
+ Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
+ Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
+ Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
+ Mockito.when(req.getCore()).thenReturn(core);
+ }
+
+ @Test public void testProcessAddWithinLimit() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "1");
+ AddUpdateCommand cmd = new AddUpdateCommand(req);
+ cmd.solrDoc = doc;
+ cmd.commitWithin = 1000;
+ cmd.overwrite = true;
+ processor.processAdd(cmd);
+ Mockito.verify(next).processAdd(cmd);
+ Mockito.verify(requestMirroringHandler).mirror(requestMock);
+ }
+
+ @Test public void testProcessAddExceedsLimit() {
+ AddUpdateCommand addUpdateCommand = new AddUpdateCommand(req);
+ SolrInputDocument solrInputDocument = new SolrInputDocument();
+ solrInputDocument.addField("id", "123");
+ solrInputDocument.addField("large_field", "Test ".repeat(10000));
+ addUpdateCommand.solrDoc = solrInputDocument;
+
+ Mockito.when(req.getCore()).thenReturn(core);
+
Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
+
Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
+
Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
+
Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
+
Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
+
+ SolrParams mirrorParams = new ModifiableSolrParams();
+ MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new
MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs
set to false
+ 50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE,
requestMirroringHandler);
+
+ assertThrows(SolrException.class, () ->
mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
+ }
+
+ @Test public void testProcessAddLeader() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ processor.processAdd(addUpdateCommand);
+ Mockito.verify(requestMirroringHandler,
Mockito.times(1)).mirror(Mockito.any());
+ }
+
+ @Test public void testProcessAddNotLeader() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
+ processor.processAdd(addUpdateCommand);
+ Mockito.verify(requestMirroringHandler,
Mockito.times(0)).mirror(Mockito.any());
+ }
+
+ @Test public void testProcessDelete() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ processor.processDelete(deleteUpdateCommand);
+ Mockito.verify(requestMirroringHandler,
Mockito.times(1)).mirror(Mockito.any());
+ }
+
+ @Test public void testProcessDBQResults() throws Exception {
+ Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+ Mockito.when(builder.build()).thenReturn(client);
+ SolrInputDocument doc = new SolrInputDocument();
+ doc.addField("id", "test");
+ addUpdateCommand.solrDoc = doc;
+ processor.processAdd(addUpdateCommand);
+
+ SolrQuery query = new SolrQuery();
+ query.setQuery("*:*");
+ query.setRows(1000);
+ query.setSort("id", SolrQuery.ORDER.asc);
+
+ processor.processDelete(deleteUpdateCommand);
+ }
+
+ @Test public void testFinish() throws IOException {
+ processor.finish();
+ }
+}
\ No newline at end of file
diff --git a/crossdc-producer/src/test/resources/log4j2.xml
b/crossdc-producer/src/test/resources/log4j2.xml
index 98c24fc..754c324 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -59,11 +59,11 @@
<Logger name="org.apache.solr.hadoop" level="INFO"/>
<Logger name="org.eclipse.jetty" level="INFO"/>
- <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer"
level="INFO"/>
- <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor"
level="INFO"/>
- <Logger
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler"
level="INFO"/>
- <Logger
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor"
level="INFO"/>
- <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink"
level="INFO"/>
+ <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer"
level="TRACE"/>
+ <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor"
level="TRACE"/>
+ <Logger
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler"
level="TRACE"/>
+ <Logger
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor"
level="TRACE"/>
+ <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink"
level="TRACE"/>
<Root level="INFO">
diff --git a/gradlew b/gradlew
old mode 100755
new mode 100644