This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/main by this push:
     new 5bb18df  Move batching from solr+kafka to just kafka. (#53)
5bb18df is described below

commit 5bb18df43fbab85043dd9b9b30319a4073ccffca
Author: Mark Robert Miller <[email protected]>
AuthorDate: Mon Apr 10 16:13:05 2023 -0500

    Move batching from solr+kafka to just kafka. (#53)
    
    * Move batching from solr+kafka to just kafka.
    Make consumer multi-threaded.
    Fix flakey tests.
    Allow multiple collections to use the same topic on the same consumer.
    Consumer can subscribe to multiple topics.
    Default to a hard fail for the client when update is too large for Kafka 
queue.
    
    * Few minor updates.
    
    * Add 22 to absolutely ensure unique id.
---
 .github/workflows/manual-test.yml                  |  44 +++
 CROSSDC.md                                         | 203 +++---------
 SolrAndKafkaIntegrationTest.java                   | 335 ++++++++++++++++++++
 build.gradle                                       |   2 +-
 .../solr/crossdc/common/KafkaCrossDcConf.java      |  12 +-
 .../common/MirroredSolrRequestSerializer.java      |  73 ++---
 .../org/apache/solr/crossdc/consumer/Consumer.java |  22 +-
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 341 ++++++++++++++++-----
 .../crossdc/consumer/KafkaCrossDcConsumerTest.java | 151 +++++++++
 crossdc-producer/build.gradle                      |   5 +-
 .../update/processor/MirroringUpdateProcessor.java | 102 +++---
 .../MirroringUpdateRequestProcessorFactory.java    |  17 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  | 149 ++++-----
 ...SolrAndKafkaMultiCollectionIntegrationTest.java | 222 ++++++++++++++
 .../solr/crossdc/SolrAndKafkaReindexTest.java      |  42 +--
 .../solr/crossdc/ZkConfigIntegrationTest.java      |  17 +-
 .../processor/MirroringUpdateProcessorTest.java    | 177 +++++++++++
 crossdc-producer/src/test/resources/log4j2.xml     |  10 +-
 gradlew                                            |   0
 19 files changed, 1463 insertions(+), 461 deletions(-)

diff --git a/.github/workflows/manual-test.yml 
b/.github/workflows/manual-test.yml
new file mode 100644
index 0000000..0d217a7
--- /dev/null
+++ b/.github/workflows/manual-test.yml
@@ -0,0 +1,44 @@
+name: SolrJ Tests
+
+on:
+  workflow_dispatch
+
+concurrency:
+  group: '${{ github.workflow }} @ ${{ github.event.pull_request.head.label || 
github.head_ref || github.ref }}'
+  cancel-in-progress: true
+
+
+jobs:
+  build:
+    container:
+      image: ubuntu-latest
+      options: --network bridge
+  test:
+    name: Run SolrJ Tests
+    runs-on: ubuntu-latest
+    steps:
+    # Setup
+    - uses: actions/checkout@v2
+    - name: Set up JDK 11
+      uses: actions/setup-java@v2
+      with:
+        distribution: 'temurin'
+        java-version: 11
+        java-package: jdk
+    - name: Grant execute permission for scipts
+      run: chmod +x cluster.sh;chmod +x cluster-stop.sh;chmod +x manual-test.sh
+    - name: Grant execute permission for gradlew
+      run: chmod +x gradlew
+    - name: Build
+      run: ./gradlew build
+
+#    - uses: actions/cache@v2
+#      with:
+#        path: |
+#          ~/.gradle/caches
+#        key: ${{ runner.os }}-gradle-solrj-${{ hashFiles('versions.lock') }}
+#        restore-keys: |
+#          ${{ runner.os }}-gradle-solrj-
+#          ${{ runner.os }}-gradle-
+    - name: Init cluster
+      run: ./manual-test.sh
diff --git a/CROSSDC.md b/CROSSDC.md
index 5c528f4..a0d5d75 100644
--- a/CROSSDC.md
+++ b/CROSSDC.md
@@ -1,199 +1,84 @@
 # Solr Cross DC: Getting Started
 
-**A simple cross-data-center fail-over solution for Apache Solr.**
-
-
-
-[TOC]
+Solr Cross DC is a simple cross-data-center fail-over solution for Apache 
Solr. It has three key components: the CrossDC Producer, the CrossDC Consumer, 
and Apache Kafka. The Producer is a Solr UpdateProcessor plugin that forwards 
updates from the primary data center, while the Consumer is an update request 
consumer application that receives updates in the backup data center. Kafka is 
the distributed queue that connects the two.
 
 ## Overview
 
-The design for this feature involves three key components:
+Solr Cross DC is designed to provide a simple and reliable way to replicate 
Solr updates across multiple data centers. It is particularly useful for 
organizations that need to ensure high availability and disaster recovery for 
their Solr clusters.
 
-- A UpdateProccessor plugin for Solr to forward updates from the primary data 
center.
-- An update request consumer application to receive updates in the backup data 
center.
-- A distributed queue to connect the above two.
-
-The UpdateProcessor plugin is called the CrossDC Producer, the consumer 
application is called the CrossDC Consumer, and the supported distributed queue 
application is Apache Kafka.
+The CrossDC Producer intercepts updates when the node acts as the leader and 
puts those updates onto the distributed queue. The CrossDC Consumer polls the 
distributed queue and forwards updates to the configured Solr cluster upon 
receiving the update requests.
 
 ## Getting Started
 
-To use Solr Cross DC, you must complete the following steps:
-
-- Startup or obtain access to an Apache Kafka cluster to provide the 
distributed queue between data centers.
-- Install the CrossDC Solr plugin on each of the nodes in your Solr cluster 
(in your primary and backup data centers) by placing the jar in the correct 
location and configuring solrconfig.xml to reference the new UpdateProcessor 
and then configure it for the Kafka cluster.
-- Install the CrossDC consumer application in the backup data center and 
configure it for the Kafka cluster and the Solr cluster it will send consumed 
updates to.
+To use Solr Cross DC, follow these steps:
 
-The Solr UpdateProccessor plugin will intercept updates when the node acts as 
the leader and then put those updates onto the distributed queue. The CrossDC 
Consumer application will poll the distributed queue and forward updates on to 
the configured Solr cluster upon receiving the update requests.
+1. Startup or obtain access to an Apache Kafka cluster to provide the 
distributed queue between data centers.
+2. Install the CrossDC Solr plugin on each node in your Solr cluster (in both 
primary and backup data centers). Place the jar in the sharedLib directory 
specified in solr.xml and configure solrconfig.xml to reference the new 
UpdateProcessor and configure it for the Kafka cluster.
+3. Install the CrossDC consumer application in the backup data center and 
configure it for the Kafka cluster and the Solr cluster it will send consumed 
updates to.
 
 ### Configuration and Startup
 
-The current configuration options are entirely minimal. Further configuration 
options will be added over time. At this early stage, some may also change.
-
 #### Installing and Configuring the Cross DC Producer Solr Plug-In
 
-1. Configure the sharedLib directory in solr.xml (eg sharedLIb=lib) and place 
the CrossDC producer plug-in jar file into the specified folder. It's not 
advisable to attempt to use the per SolrCore instance directory lib folder as 
you would have to duplicate the plug-in many times and manage it when creating 
new collections or adding replicas or shards.
-
-
-**solr.xml**
+1. Configure the sharedLib directory in solr.xml (e.g., sharedLIb=lib) and 
place the CrossDC producer plug-in jar file into the specified folder. 
+    **solr.xml**
 
    ```xml
    <solr>
      <str name="sharedLib">${solr.sharedLib:}</str>
    ```
-
-
-
-2. Configure the new UpdateProcessor in solrconfig.xml
-
-   **NOTE:** `The following is not the recommended configuration approach in 
production, see the information on central configuration below!`
-
-
-
-**solrconfig.xml**
-
-   ```xml
-   <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
-   
-     <processor 
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
-       <str name="bootstrapServers">${bootstrapServers:}</str>
-       <str name="topicName">${topicName:}</str>
-     </processor>
-   
-     <processor class="solr.LogUpdateProcessorFactory" />
-     <processor class="solr.RunUpdateProcessorFactory" />
-   </updateRequestProcessorChain>
-   ```
-
-Notice that this update chain has been declared to be the default chain used.
-
-
-
-##### Configuration Properties
-
-There are two configuration properties. You can specify them directly, or use 
the above notation to allow them to specified via system property (generally 
configured for Solr in the bin/solr.in.sh file).
-
-   ```
-   bootstrapServers
-   ```
-
-The list of servers used to connect to the Kafka cluster, see 
https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
-
-   ```
-   topicName 
-   ```
-
-The Kafka topicName used to indicate which Kafka queue the Solr updates will 
be pushed on to.
-
-
-
-3. Add an external version constraint UpdateProcessor to the update chain 
added to solrconfig.xml to allow user-provided update versions (as opposed to 
the two Solr clusters using the independently managed built-in versioning).
-
-   
https://solr.apache.org/guide/8_11/update-request-processors.html#general-use-updateprocessorfactories
-
-   
https://solr.apache.org/docs/8_1_1/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.html
-
-
+3. Configure the new UpdateProcessor in solrconfig.xml.
+    ```xml
+       <updateRequestProcessorChain  name="mirrorUpdateChain" default="true">
+       
+         <processor 
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
+           <str name="bootstrapServers">${bootstrapServers:}</str>
+           <str name="topicName">${topicName:}</str>
+         </processor>
+       
+         <processor class="solr.LogUpdateProcessorFactory" />
+         <processor class="solr.RunUpdateProcessorFactory" />
+       </updateRequestProcessorChain>
+       ```
+4. Add an external version constraint UpdateProcessor to the update chain 
added to solrconfig.xml to allow user-provided update versions.
+   See 
https://solr.apache.org/guide/8_11/update-request-processors.html#general-use-updateprocessorfactories
 and 
https://solr.apache.org/docs/8_1_1/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.html
 4. Start or restart the Solr cluster(s).
 
+##### Configuration Properties
 
+There are two configuration properties: 
+- `bootstrapServers`: list of servers used to connect to the Kafka cluster
+- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr 
updates will be pushed on 
 
 #### Installing and Configuring the CrossDC Consumer Application
 
 1. Uncompress the distribution tar or zip file for the CrossDC Consumer into 
an appropriate install location on a node in the receiving data center.
-2. You can start the Consumer process via the included shell start script at 
bin/crossdc-consumer.
-3. You can configure the CrossDC Consumer via Java system properties pass in 
the CROSSDC_CONSUMER_OPTS environment variable, i.e. 
CROSSDC_CONSUMER_OPTS="-DbootstrapServers=127.0.0.1:2181 
-DzkConnectString=127.0.0.1:2181 -DtopicName=crossdc" bin/crossdc-consumer
-
-The required configuration properties are:
-
+2. Start the Consumer process via the included shell start script at 
bin/crossdc-consumer.
+3. Configure the CrossDC Consumer via Java system properties pass in the 
CROSSDC_CONSUMER_OPTS environment variable.
 
-   *bootstrapServers* - the list of servers used to connect to the Kafka 
cluster 
https://kafka.apache.org/28/documentation.html#producerconfigs_bootstrap.servers
+The required configuration properties are: 
+- `bootstrapServers`: list of servers used to connect to the Kafka cluster 
+- `topicName`: Kafka topicName used to indicate which Kafka queue the Solr 
updates will be pushed to. This can be a comma separated list for the Consumer 
if you would like to consume multiple topics.
+- `zkConnectString`: Zookeeper connection string used by Solr to connect to 
its Zookeeper cluster in the backup data center
 
-   *topicName* - the Kafka topicName used to indicate which Kafka queue the 
Solr updates will be pushed to.
+The following additional configuration properties should either be specified 
for both the producer and the consumer or in the shared Zookeeper central 
config properties file:
 
-   *zkConnectString* - the Zookeeper connection string used by Solr to connect 
to its Zookeeper cluster in the backup data center
-
-Additional configuration properties:
-
-   *groupId* - the group id to give Kafka for the consumer, default to the 
empty string if not specified.
-
-The following additional configuration properties should either be specified 
for both the producer and the consumer or in the shared Zookeeper
-central config properties file. This is because the Consumer will use a 
Producer for retries.
-
-   *batchSizeBytes* - the maximum batch size in bytes for the queue
-   *bufferMemoryBytes* - the amount of memory in bytes allocated by the 
Producer in total for buffering 
-   *lingerMs* - the amount of time that the Producer will wait to add to a 
batch
-   *requestTimeout* - request timeout for the Producer - when used for the 
Consumers retry Producer, this should be less than the timeout that will cause 
the Consumer to be removed from the group for taking too long.
+- `batchSizeBytes`: maximum batch size in bytes for the queue
+- `bufferMemoryBytes`: memory allocated by the Producer in total for buffering 
+- `lingerMs`: amount of time that the Producer will wait to add to a batch
+- `requestTimeout`: request timeout for the Producer 
 
 #### Central Configuration Option
 
-You can optionally manage the configuration centrally in Solr's Zookeeper 
cluster by placing a properties file called *crossdc.properties* in the root 
Solr Zookeeper znode, eg, */solr/crossdc.properties*.  This allows you to 
update the configuration in a central location rather than at each 
solrconfig.xml in each Solr node and also automatically deals with new Solr 
nodes or Consumers to come up without requiring additional configuration.
-
-
-
-Both *bootstrapServers* and *topicName* properties can be put in this file, in 
which case you would not have to specify any Kafka configuration in the 
solrconfig.xml for the CrossDC Producer Solr plugin. Likewise, for the CrossDC 
Consumer application, you would only have to set *zkConnectString* for the 
local Solr cluster. Note that the two components will be looking in the 
Zookeeper clusters in their respective data center locations.
-
-You can override the properties file location and znode name in Zookeeper 
using the system property 
*zkCrossDcPropsPath=/path/to/props_file_name.properties*
+You can manage the configuration centrally in Solr's Zookeeper cluster by 
placing a properties file called *crossdc.properties* in the root Solr 
Zookeeper znode, eg, */solr/crossdc.properties*. Both *bootstrapServers* and 
*topicName* properties can be put in this file. For the CrossDC Consumer 
application, you would only have to set *zkConnectString* for the local Solr 
cluster.
 
 #### Making the Cross DC UpdateProcessor Optional in a Common solrconfig.xml
 
-The simplest and least invasive way to control whether the Cross DC 
UpdateProcessor is on or off for a node is to configure the update chain it's 
used in to be the default chain or not via Solr's system property configuration 
syntax.  This syntax takes the form of ${*system_property_name*} and will be 
substituted with the value of that system property when the configuration is 
parsed. You can specify a default value using the following syntax: 
${*system_property_name*:*default_value*}. Y [...]
-
-*Having a separate updateRequestProcessorChain avoids a lot of additional 
constraints you have to deal with or consider, now or in the future, when 
compared to forcing all Cross DC and non-Cross DC use down a single, required, 
common updateRequestProcessorChain.*
-
-Further, any application consuming the configuration with no concern for 
enabling Cross DC will not be artificially limited in its ability to define, 
manage and use updateRequestProcessorChain's.
-
-The following would enable a system property to safely and non invasively 
enable or disable Cross DC for a node:
-
-
-```xml
-<updateRequestProcessorChain  name="crossdcUpdateChain" 
default="${crossdcEnabled:false}">
-  <processor 
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
-    <bool name="enabled">${enabled:false}</bool>
-  </processor>
-  <processor class="solr.LogUpdateProcessorFactory" />
-  <processor class="solr.RunUpdateProcessorFactory" />
-</updateRequestProcessorChain>
-```
-
-
-
-The above configuration would default to Cross DC being disabled with minimal 
impact to any non-Cross DC use, and Cross DC could be enabled by starting Solr 
with the system property crossdcEnabled=true.
-
-The last chain to declare it's the default wins, so you can put this at the 
bottom of almost any existing solrconfig.xml to create an optional Cross DC 
path without having to audit, understand, adapt, or test existing non-Cross DC 
paths as other options call for.
-
-The above is the simplest and least obtrusive way to manage an on/off switch 
for Cross DC.
-
-**Note:** If your configuration already makes use of update handlers and/or 
updates independently specifying different updateRequestProcessorChains, your 
solution may end up a bit more sophisticated.
-
-
-
-For situations where you do want to control and enforce a single 
updateRequestProcessorChain path for every consumer of the solrconfig.xml, it's 
enough to simply use the *enabled* attribute, turning the processor into a NOOP 
in the chain.
-
-
-
-```xml
-<updateRequestProcessorChain  name="crossdcUpdateChain">
-  <processor 
class="org.apache.solr.update.processor.MirroringUpdateRequestProcessorFactory">
-    <bool name="enabled">${enabled:false}</bool>
-  </processor>
-  <processor class="solr.LogUpdateProcessorFactory" />
-  <processor class="solr.RunUpdateProcessorFactory" />
-</updateRequestProcessorChain>
-```
-
-
+Use the *enabled* attribute, false turns the processor into a NOOP in the 
chain.
 
 ## Limitations
 
-- Delete-By-Query is not officially supported.
-
-    - Work-In-Progress: A non-efficient option to issue multiple delete by id 
queries using the results of a given standard query.
-
-    - Simply forwarding a real Delete-By-Query could also be reasonable if it 
is not strictly reliant on not being reordered with other requests.
-
-
-
-## Additional Notes
+- Delete-By-Query converts to DeleteById, which can be much less efficient for 
queries matching large numbers of documents.
+  Forwarding a real Delete-By-Query could also be reasonable if it is not 
strictly reliant on not being reordered with other requests.
 
-In these early days, it may help to reference the *cluster.sh* script located 
in the root of the CrossDC repository. This script is a helpful developer tool 
for manual testing and it will download Solr and Kafka and then configure both 
for Cross DC.
\ No newline at end of file
+cluster.sh* script located in the root of the CrossDC repository. This script 
is a helpful developer tool for manual testing and it will download Solr and 
Kafka and then configure both for Cross DC.
\ No newline at end of file
diff --git a/SolrAndKafkaIntegrationTest.java b/SolrAndKafkaIntegrationTest.java
new file mode 100644
index 0000000..f935538
--- /dev/null
+++ b/SolrAndKafkaIntegrationTest.java
@@ -0,0 +1,335 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BaseCloudSolrClient;
+import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.sys.Prop;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
+import static org.mockito.Mockito.spy;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { 
SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class SolrAndKafkaIntegrationTest 
extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int MAX_DOC_SIZE_BYTES = 
Integer.parseInt(DEFAULT_MAX_REQUEST_SIZE);
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public EmbeddedKafkaCluster kafkaCluster;
+
+  protected volatile MiniSolrCloudCluster solrCluster1;
+  protected volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer;
+
+  private static String TOPIC = "topic1";
+
+  private static String COLLECTION = "collection1";
+  private static String ALT_COLLECTION = "collection2";
+
+  @Before
+  public void beforeSolrAndKafkaIntegrationTest() throws Exception {
+    consumer = new Consumer();
+    Properties config = new Properties();
+    //config.put("unclean.leader.election.enable", "true");
+    //config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC, 1, 1);
+
+    System.setProperty("topicName", TOPIC);
+    System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+    System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
+
+    solrCluster1 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
+        
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    solrCluster2 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
+        
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES, 
MAX_DOC_SIZE_BYTES);
+    consumer.start(properties);
+
+  }
+
+  @After
+  public void afterSolrAndKafkaIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+
+    consumer.shutdown();
+    consumer = null;
+
+    try {
+      //kafkaCluster.deleteAllTopicsAndWait(10000);
+      kafkaCluster.stop();
+      kafkaCluster = null;
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+
+  }
+
+//  public void testFullCloudToCloud() throws Exception {
+//    CloudSolrClient client = solrCluster1.getSolrClient();
+//    SolrInputDocument doc = new SolrInputDocument();
+//    doc.addField("id", String.valueOf(System.currentTimeMillis()));
+//    doc.addField("text", "some test");
+//
+//    client.add(doc);
+//
+//    client.commit(COLLECTION);
+//
+//    System.out.println("Sent producer record");
+//
+//    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+//  }
+//
+  private static SolrInputDocument getDoc() {
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.nanoTime()));
+    doc.addField("text", "some test");
+    return doc;
+  }
+//
+//  public void testProducerToCloud() throws Exception {
+//    Properties properties = new Properties();
+//    properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
+//    properties.put("acks", "all");
+//    properties.put("retries", 1);
+//    properties.put("batch.size", 1);
+//    properties.put("buffer.memory", 33554432);
+//    properties.put("linger.ms", 1);
+//    properties.put("key.serializer", StringSerializer.class.getName());
+//    properties.put("value.serializer", 
MirroredSolrRequestSerializer.class.getName());
+//    Producer<String, MirroredSolrRequest> producer = new 
KafkaProducer(properties);
+//    UpdateRequest updateRequest = new UpdateRequest();
+//    updateRequest.setParam("shouldMirror", "true");
+//    updateRequest.add("id", String.valueOf(System.currentTimeMillis()), 
"text", "test");
+//    updateRequest.add("id", String.valueOf(System.currentTimeMillis() + 22), 
"text", "test2");
+//    updateRequest.setParam("collection", COLLECTION);
+//    MirroredSolrRequest mirroredSolrRequest = new 
MirroredSolrRequest(updateRequest);
+//    System.out.println("About to send producer record");
+//    producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest), (metadata, 
exception) -> {
+//      log.info("Producer finished sending metadata={}, exception={}", 
metadata, exception);
+//    });
+//    producer.flush();
+//
+//    System.out.println("Sent producer record");
+//
+//    solrCluster2.getSolrClient().commit(COLLECTION);
+//
+//    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 2);
+//
+//    producer.close();
+//  }
+
+//  @Test
+//  public void testMirroringUpdateProcessor() throws Exception {
+//    final SolrInputDocument tooLargeDoc = new SolrInputDocument();
+//    tooLargeDoc.addField("id", "tooLarge-" + 
String.valueOf(System.currentTimeMillis()));
+//    tooLargeDoc.addField("text", new String(new byte[2 * 
MAX_DOC_SIZE_BYTES]));
+//    final SolrInputDocument normalDoc = new SolrInputDocument();
+//    normalDoc.addField("id", "normalDoc-" + 
String.valueOf(System.currentTimeMillis()));
+//    normalDoc.addField("text", "Hello world");
+//    final List<SolrInputDocument> docsToIndex = new ArrayList<>();
+//    docsToIndex.add(normalDoc);
+//    docsToIndex.add(tooLargeDoc);
+//
+//    final CloudSolrClient cluster1Client = solrCluster1.getSolrClient();
+//    try {
+//      cluster1Client.add(docsToIndex);
+//    } catch (BaseCloudSolrClient.RouteException e) {
+//      // expected
+//    }
+//    cluster1Client.commit(COLLECTION);
+//
+//    // Primary and secondary should each only index 'normalDoc'
+//    final String normalDocQuery = "id:" + 
normalDoc.get("id").getFirstValue();
+//    assertCluster2EventuallyHasDocs(COLLECTION, normalDocQuery, 1);
+//    assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
+//    assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, 
normalDocQuery, 1);
+//    assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, "*:*", 1);
+//
+//    // Create new primary+secondary collection where 'tooLarge' docs ARE 
indexed on the primary
+//    CollectionAdminRequest.Create create =
+//        CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1)
+//            .withProperty("indexUnmirrorableDocs", "true");
+//    try {
+//      solrCluster1.getSolrClient().request(create);
+//      solrCluster2.getSolrClient().request(create);
+//      solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+//      solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+//
+//      cluster1Client.add(ALT_COLLECTION, docsToIndex);
+//      cluster1Client.commit(ALT_COLLECTION);
+//
+//      // Primary should have both 'normal' and 'large' docs; secondary 
should only have 'normal' doc.
+//      assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 
2);
+//      assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
+//      assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
+//    } finally {
+//      CollectionAdminRequest.Delete delete =
+//        CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+//      solrCluster1.getSolrClient().request(delete);
+//      solrCluster2.getSolrClient().request(delete);
+//    }
+//  }
+
+  private void assertCluster2EventuallyHasDocs(String collection, String 
query, int expectedNumDocs) throws Exception {
+    assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, 
query, expectedNumDocs);
+  }
+
+  private void createCollection(CloudSolrClient client, 
CollectionAdminRequest.Create createCmd) throws Exception {
+    final String stashedDefault = client.getDefaultCollection();
+    try {
+      //client.setDefaultCollection(null);
+      client.request(createCmd);
+    } finally {
+      //client.setDefaultCollection(stashedDefault);
+    }
+  }
+
+  @Test
+  public void testFullCloudToCloudMultiCollection() throws Exception {
+    CollectionAdminRequest.Create create =
+            CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 
1);
+
+    try {
+      solrCluster1.getSolrClient().request(create);
+      solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+      solrCluster2.getSolrClient().request(create);
+      solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+
+      CloudSolrClient client = solrCluster1.getSolrClient();
+
+      SolrInputDocument doc1 = getDoc();
+      SolrInputDocument doc2 = getDoc();
+      SolrInputDocument doc3 = getDoc();
+      SolrInputDocument doc4 = getDoc();
+      SolrInputDocument doc5 = getDoc();
+      SolrInputDocument doc6 = getDoc();
+      SolrInputDocument doc7 = getDoc();
+
+      client.add(COLLECTION, doc1);
+      client.add(ALT_COLLECTION, doc2);
+      client.add(COLLECTION, doc3);
+      client.add(COLLECTION, doc4);
+      client.add(ALT_COLLECTION, doc5);
+      client.add(ALT_COLLECTION, doc6);
+      client.add(COLLECTION, doc7);
+
+      client.commit(COLLECTION);
+      client.commit(ALT_COLLECTION);
+
+      System.out.println("Sent producer record");
+
+      assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 3);
+      assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 4);
+
+    } finally {
+      CollectionAdminRequest.Delete delete =
+              CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+      solrCluster1.getSolrClient().request(delete);
+      solrCluster2.getSolrClient().request(delete);
+    }
+  }
+
+
+  private void assertClusterEventuallyHasDocs(SolrClient client, String 
collection, String query, int expectedNumDocs) throws Exception {
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 100; i++) {
+      client.commit(collection);
+      results = client.query(collection, new SolrQuery(query));
+      if (results.getResults().getNumFound() == expectedNumDocs) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(200);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+  }
+}
diff --git a/build.gradle b/build.gradle
index 9a38422..25af428 100644
--- a/build.gradle
+++ b/build.gradle
@@ -28,5 +28,5 @@ description 'Root for Solr plugins sandbox'
 
 subprojects {
     group "org.apache.solr.crossdc"
-    group "org.apache.solr.encryption"
+    //group "org.apache.solr.encryption"
 }
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index 0b45bbb..0dbc160 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -23,7 +23,6 @@ import 
org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 
 import java.util.*;
 
-import static 
org.apache.solr.crossdc.common.SensitivePropRedactionUtils.propertyRequiresRedaction;
 import static 
org.apache.solr.crossdc.common.SensitivePropRedactionUtils.redactPropertyIfNecessary;
 
 public class KafkaCrossDcConf extends CrossDcConf {
@@ -34,10 +33,13 @@ public class KafkaCrossDcConf extends CrossDcConf {
   public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
   public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880";
   public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none";
+  private static final String DEFAULT_INDEX_UNMIRRORABLE_DOCS = "false";
   public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000";
   public static final String DEFAULT_NUM_RETRIES = null; // by default, we 
control retries with DELIVERY_TIMEOUT_MS_DOC
   private static final String DEFAULT_RETRY_BACKOFF_MS = "500";
 
+  private static final String DEFAULT_CONSUMER_PROCESSING_THREADS = "5";
+
   private static final String DEFAULT_DELIVERY_TIMEOUT_MS = "120000";
 
   public static final String DEFAULT_MAX_POLL_RECORDS = "500"; // same default 
as Kafka
@@ -70,12 +72,16 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String ENABLE_DATA_COMPRESSION = "enableDataCompression";
 
+  public static final String INDEX_UNMIRRORABLE_DOCS = "indexUnmirrorableDocs";
+
   public static final String SLOW_SUBMIT_THRESHOLD_MS = 
"slowSubmitThresholdMs";
 
   public static final String NUM_RETRIES = "numRetries";
 
   public static final String RETRY_BACKOFF_MS = "retryBackoffMs";
 
+  public static final String CONSUMER_PROCESSING_THREADS = 
"consumerProcessingThreads";
+
   public static final String DELIVERY_TIMEOUT_MS = "retryBackoffMs";
 
   public static final String FETCH_MIN_BYTES = "fetchMinBytes";
@@ -100,8 +106,6 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
   public static final String GROUP_ID = "groupId";
 
-
-
   static {
     List<ConfigProperty> configProperties = new ArrayList<>(
         List.of(new ConfigProperty(TOPIC_NAME), new 
ConfigProperty(BOOTSTRAP_SERVERS),
@@ -111,6 +115,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(REQUEST_TIMEOUT_MS, DEFAULT_REQUEST_TIMEOUT),
             new ConfigProperty(MAX_REQUEST_SIZE_BYTES, 
DEFAULT_MAX_REQUEST_SIZE),
             new ConfigProperty(ENABLE_DATA_COMPRESSION, 
DEFAULT_ENABLE_DATA_COMPRESSION),
+            new ConfigProperty(INDEX_UNMIRRORABLE_DOCS, 
DEFAULT_INDEX_UNMIRRORABLE_DOCS),
             new ConfigProperty(SLOW_SUBMIT_THRESHOLD_MS, 
DEFAULT_SLOW_SEND_THRESHOLD),
             new ConfigProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES),
             new ConfigProperty(RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MS),
@@ -121,6 +126,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
             new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
             new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
+            new ConfigProperty(CONSUMER_PROCESSING_THREADS, 
DEFAULT_CONSUMER_PROCESSING_THREADS),
 
             new ConfigProperty(MAX_PARTITION_FETCH_BYTES, 
DEFAULT_MAX_PARTITION_FETCH_BYTES),
             new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 3f0684d..f93c361 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -52,49 +52,51 @@ public class MirroredSolrRequestSerializer implements 
Serializer<MirroredSolrReq
     @Override
     public MirroredSolrRequest deserialize(String topic, byte[] data) {
         Map solrRequest;
+        UpdateRequest updateRequest = new UpdateRequest();
+        try (JavaBinCodec codec = new JavaBinCodec()) {
+            ByteArrayInputStream bais = new ByteArrayInputStream(data);
 
-        JavaBinCodec codec = new JavaBinCodec();
-        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+            try {
+                solrRequest = (Map) codec.unmarshal(bais);
 
-        try {
-            solrRequest = (Map) codec.unmarshal(bais);
+                if (log.isTraceEnabled()) {
+                    log.trace("Deserialized class={} solrRequest={}", 
solrRequest.getClass().getName(),
+                        solrRequest);
+                }
 
-            if (log.isTraceEnabled()) {
-                log.trace("Deserialized class={} solrRequest={}", 
solrRequest.getClass().getName(),
-                    solrRequest);
+            } catch (Exception e) {
+                log.error("Exception unmarshalling JavaBin", e);
+                throw new RuntimeException(e);
             }
 
 
-        } catch (Exception e) {
-            log.error("Exception unmarshalling JavaBin", e);
-            throw new RuntimeException(e);
-        }
-
-        UpdateRequest updateRequest = new UpdateRequest();
-        List docs = (List) solrRequest.get("docs");
-        if (docs != null) {
-            updateRequest.add(docs);
-        } else {
-            updateRequest.add("id", "1");
-            updateRequest.getDocumentsMap().clear();
-        }
-
-        List deletes = (List) solrRequest.get("deletes");
-        if (deletes != null) {
-            updateRequest.deleteById(deletes);
-        }
+            List docs = (List) solrRequest.get("docs");
+            if (docs != null) {
+                updateRequest.add(docs);
+            } else {
+                updateRequest.add("id", "1"); // TODO huh?
+                updateRequest.getDocumentsMap().clear();
+            }
 
-        List deletesQuery = (List) solrRequest.get("deleteQuery");
-        if (deletesQuery != null) {
-            for (Object delQuery : deletesQuery) {
-                updateRequest.deleteByQuery((String) delQuery);
+            List deletes = (List) solrRequest.get("deletes");
+            if (deletes != null) {
+                updateRequest.deleteById(deletes);
             }
-        }
 
+            List deletesQuery = (List) solrRequest.get("deleteQuery");
+            if (deletesQuery != null) {
+                for (Object delQuery : deletesQuery) {
+                    updateRequest.deleteByQuery((String) delQuery);
+                }
+            }
 
-        Map params = (Map) solrRequest.get("params");
-        if (params != null) {
-            updateRequest.setParams(ModifiableSolrParams.of(new 
MapSolrParams(params)));
+            Map params = (Map) solrRequest.get("params");
+            if (params != null) {
+                updateRequest.setParams(ModifiableSolrParams.of(new 
MapSolrParams(params)));
+            }
+        } catch (IOException e) {
+            log.error("Error in deserialize", e);
+            throw new RuntimeException(e);
         }
 
         return new MirroredSolrRequest(updateRequest);
@@ -120,12 +122,10 @@ public class MirroredSolrRequestSerializer implements 
Serializer<MirroredSolrReq
         try (JavaBinCodec codec = new JavaBinCodec(null)) {
 
             ExposedByteArrayOutputStream baos = new 
ExposedByteArrayOutputStream();
-            Map map = new HashMap(4);
+            Map map = new HashMap(8);
             map.put("params", solrRequest.getParams());
             map.put("docs", solrRequest.getDocuments());
 
-            // TODO
-            //map.put("deletes", solrRequest.getDeleteByIdMap());
             map.put("deletes", solrRequest.getDeleteById());
             map.put("deleteQuery", solrRequest.getDeleteQuery());
 
@@ -134,6 +134,7 @@ public class MirroredSolrRequestSerializer implements 
Serializer<MirroredSolrReq
             return baos.byteArray();
 
         } catch (IOException e) {
+            log.error("Error in serialize", e);
             throw new RuntimeException(e);
         }
 
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 08749b0..e4bc732 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -22,7 +22,6 @@ import org.apache.solr.crossdc.common.ConfigProperty;
 import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.SensitivePropRedactionUtils;
-import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.eclipse.jetty.server.Server;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,8 +31,10 @@ import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
 
@@ -47,6 +48,8 @@ public class Consumer {
     private Server server;
     private CrossDcConsumer crossDcConsumer;
 
+    private CountDownLatch startLatch = new CountDownLatch(1);
+
 
     public void start() {
         start(new HashMap<>());
@@ -104,25 +107,29 @@ public class Consumer {
         //connector.setPort(port);
         //server.setConnectors(new Connector[] {connector})
         KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
-        crossDcConsumer = getCrossDcConsumer(conf);
+        crossDcConsumer = getCrossDcConsumer(conf, startLatch);
 
         // Start consumer thread
 
         log.info("Starting CrossDC Consumer {}", conf);
 
-        /**
-         * ExecutorService to manage the cross-dc consumer threads.
-         */
         ExecutorService consumerThreadExecutor = 
Executors.newSingleThreadExecutor();
         consumerThreadExecutor.submit(crossDcConsumer);
 
         // Register shutdown hook
         Thread shutdownHook = new Thread(() -> System.out.println("Shutting 
down consumers!"));
         Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+        try {
+            startLatch.await(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new 
SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
+        }
     }
 
-    private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
-        return new KafkaCrossDcConsumer(conf);
+    private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf, 
CountDownLatch startLatch) {
+        return new KafkaCrossDcConsumer(conf, startLatch);
     }
 
     public static void main(String[] args) {
@@ -141,7 +148,6 @@ public class Consumer {
      * Abstract class for defining cross-dc consumer
      */
     public abstract static class CrossDcConsumer implements Runnable {
-        SolrMessageProcessor messageProcessor;
         abstract void shutdown();
 
     }
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 451253b..9ac506a 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -8,7 +8,11 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.params.ModifiableSolrParams;
 import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.common.util.NamedList;
 import org.apache.solr.crossdc.common.*;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
 import org.slf4j.Logger;
@@ -16,35 +20,59 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.time.Duration;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.Properties;
+import java.util.*;
+import java.util.concurrent.*;
 
 /**
- * Class to run the consumer thread for Kafka. This also contains the 
implementation for retries and
- * resubmitting to the queue in case of temporary failures.
+ * This is a Java class called KafkaCrossDcConsumer, which is part of the 
Apache Solr framework.
+ * It consumes messages from Kafka and mirrors them into a Solr instance. It 
uses a KafkaConsumer
+ * object to subscribe to one or more topics and receive ConsumerRecords that 
contain MirroredSolrRequest
+ * objects. The SolrMessageProcessor handles each MirroredSolrRequest and 
sends the resulting
+ * UpdateRequest to the CloudSolrClient for indexing. A ThreadPoolExecutor is 
used to handle the update
+ * requests asynchronously. The KafkaCrossDcConsumer also handles offset 
management, committing offsets
+ * to Kafka and can seek to specific offsets for error recovery. The class 
provides methods to start and
+ * top the consumer thread.
  */
 public class KafkaCrossDcConsumer extends Consumer.CrossDcConsumer {
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
   private final MetricRegistry metrics = 
SharedMetricRegistries.getOrCreate("metrics");
 
-  private final KafkaConsumer<String, MirroredSolrRequest> consumer;
-  private final KafkaMirroringSink kafkaMirroringSink;
+  private final KafkaConsumer<String,MirroredSolrRequest> consumer;
+  private final CountDownLatch startLatch;
+  KafkaMirroringSink kafkaMirroringSink;
 
   private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
-  private final String topicName;
+  private final String[] topicNames;
   private final SolrMessageProcessor messageProcessor;
 
   private final CloudSolrClient solrClient;
 
+  private final ArrayBlockingQueue<Runnable> queue = new 
ArrayBlockingQueue<>(10) {
+    @Override public boolean offer(Runnable r) {
+      //return super.offer(r);
+      try {
+        super.put(r);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+      return true;
+    }
+
+  };
+  private final ThreadPoolExecutor executor;
+
+  private final ConcurrentHashMap<TopicPartition,PartitionWork> 
partitionWorkMap = new ConcurrentHashMap<>();
+
   /**
-   * @param conf The Kafka consumer configuration
+   * @param conf       The Kafka consumer configuration
+   * @param startLatch
    */
-  public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
-    this.topicName = conf.get(KafkaCrossDcConf.TOPIC_NAME);
+  public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch 
startLatch) {
 
+    this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
+    this.startLatch = startLatch;
     final Properties kafkaConsumerProps = new Properties();
 
     kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
@@ -66,10 +94,17 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
     KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
 
     kafkaConsumerProps.putAll(conf.getAdditionalProperties());
+    int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
+    executor = new ThreadPoolExecutor(threads, threads, 0L, 
TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
+      @Override public Thread newThread(Runnable r) {
+        Thread t = new Thread(r);
+        t.setName("KafkaCrossDcConsumerWorker");
+        return t;
+      }
+    });
+    executor.prestartAllCoreThreads();
 
-    solrClient =
-        new 
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
-            Optional.empty()).build();
+    solrClient = createSolrClient(conf);
 
     messageProcessor = new SolrMessageProcessor(solrClient, resubmitRequest -> 
0L);
 
@@ -78,14 +113,13 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
     // Create producer for resubmitting failed requests
     log.info("Creating Kafka resubmit producer");
-    this.kafkaMirroringSink = new KafkaMirroringSink(conf);
+    this.kafkaMirroringSink = createKafkaMirroringSink(conf);
     log.info("Created Kafka resubmit producer");
 
   }
 
-  public static KafkaConsumer<String, MirroredSolrRequest> 
createConsumer(Properties properties) {
-    return new KafkaConsumer<>(properties, new StringDeserializer(),
-        new MirroredSolrRequestSerializer());
+  public KafkaConsumer<String,MirroredSolrRequest> createConsumer(Properties 
properties) {
+    return new KafkaConsumer<>(properties, new StringDeserializer(), new 
MirroredSolrRequestSerializer());
   }
 
   /**
@@ -95,11 +129,14 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
    * 3. Send the request to the MirroredSolrRequestHandler that has the 
processing, retry, error handling logic.
    */
   @Override public void run() {
-    log.info("About to start Kafka consumer thread, topic={}", topicName);
+    log.info("About to start Kafka consumer thread, topics={}", 
Arrays.asList(topicNames));
 
     try {
 
-      consumer.subscribe(Collections.singleton(topicName));
+      consumer.subscribe(Arrays.asList((topicNames)));
+
+      log.info("Consumer started");
+      startLatch.countDown();
 
       while (pollAndProcessRequests()) {
         //no-op within this loop: everything is done in pollAndProcessRequests 
method defined above.
@@ -130,81 +167,113 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   boolean pollAndProcessRequests() {
     log.trace("Entered pollAndProcessRequests loop");
     try {
-      ConsumerRecords<String, MirroredSolrRequest> records =
-          consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+      for (TopicPartition partition : partitionWorkMap.keySet()) {
+        checkForOffsetUpdates(partition);
+      }
+
+      ConsumerRecords<String,MirroredSolrRequest> records = 
consumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
+
+      if (log.isTraceEnabled()) {
+        log.trace("poll return {} records", records.count());
+      }
+
+      UpdateRequest solrReqBatch = null;
+
+      ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
+
       for (TopicPartition partition : records.partitions()) {
-        List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords =
-            records.records(partition);
+        List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords = 
records.records(partition);
+
+        PartitionWork partitionWork = partitionWorkMap.compute(partition, (k, 
v) -> {
+          if (v == null) {
+            return new PartitionWork();
+          }
+          return v;
+        });
+        WorkUnit workUnit = new WorkUnit();
+        workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+        partitionWork.partitionQueue.add(workUnit);
         try {
-          for (ConsumerRecord<String, MirroredSolrRequest> record : 
partitionRecords) {
+          ModifiableSolrParams lastParams = null;
+          NamedList lastParamsAsNamedList = null;
+          solrReqBatch = new UpdateRequest();
+          for (ConsumerRecord<String,MirroredSolrRequest> requestRecord : 
partitionRecords) {
             if (log.isTraceEnabled()) {
-              log.trace("Fetched record from topic={} partition={} key={} 
value={}", record.topic(),
-                  record.partition(), record.key(), record.value());
+              log.trace("Fetched record from topic={} partition={} key={} 
value={}", requestRecord.topic(), requestRecord.partition(), 
requestRecord.key(),
+                  requestRecord.value());
             }
-            IQueueHandler.Result<MirroredSolrRequest> result = 
messageProcessor.handleItem(record.value());
-            switch (result.status()) {
-              case FAILED_RESUBMIT:
-                // currently, we use a strategy taken from an earlier working 
implementation
-                // of just resubmitting back to the queue - note that in rare 
cases, this could
-                // allow for incorrect update reorders
-                if (log.isTraceEnabled()) {
-                  log.trace("result=failed-resubmit");
-                }
-                metrics.counter("failed-resubmit").inc();
-                kafkaMirroringSink.submit(record.value());
-                break;
-              case HANDLED:
-                // no-op
-                if (log.isTraceEnabled()) {
-                  log.trace("result=handled");
-                }
-                metrics.counter("handled").inc();
-                break;
-              case NOT_HANDLED_SHUTDOWN:
-                if (log.isTraceEnabled()) {
-                  log.trace("result=nothandled_shutdown");
-                }
-                metrics.counter("nothandled_shutdown").inc();
-              case FAILED_RETRY:
-                log.error("Unexpected response while processing request. We 
never expect {}.",
-                    result.status().toString());
-                metrics.counter("failed-retry").inc();
-                break;
-              default:
-                if (log.isTraceEnabled()) {
-                  log.trace("result=no matching case");
-                }
-                // no-op
+
+            lastRecord = requestRecord;
+            MirroredSolrRequest req = requestRecord.value();
+            UpdateRequest solrReq = (UpdateRequest) req.getSolrRequest();
+            ModifiableSolrParams params = solrReq.getParams();
+            if (log.isTraceEnabled()) {
+              log.trace("params={}", params);
+            }
+
+            if (lastParams != null && 
!lastParams.toNamedList().equals(params.toNamedList())) {
+              if (log.isTraceEnabled()) {
+                log.trace("SolrParams have changed, starting new 
UpdateRequest, params={}", params);
+              }
+              lastParamsAsNamedList = null;
+              sendBatch(solrReqBatch, lastRecord, workUnit);
+              solrReqBatch = new UpdateRequest();
+              workUnit = new WorkUnit();
+              workUnit.nextOffset = getOffsetForPartition(partitionRecords);
+              partitionWork.partitionQueue.add(workUnit);
+            }
+
+            lastParams = solrReq.getParams();
+            solrReqBatch.setParams(params);
+            if (lastParamsAsNamedList == null) {
+              lastParamsAsNamedList = lastParams.toNamedList();
+            }
+
+            List<SolrInputDocument> docs = solrReq.getDocuments();
+            if (docs != null) {
+              solrReqBatch.add(docs);
+            }
+            List<String> deletes = solrReq.getDeleteById();
+            if (deletes != null) {
+              solrReqBatch.deleteById(deletes);
+            }
+            List<String> deleteByQuery = solrReq.getDeleteQuery();
+            if (deleteByQuery != null) {
+              for (String delByQuery : deleteByQuery) {
+                solrReqBatch.deleteByQuery(delByQuery);
+              }
             }
+
           }
-          updateOffset(partition, partitionRecords);
+
+          sendBatch(solrReqBatch, lastRecord, workUnit);
+
+          checkForOffsetUpdates(partition);
 
           // handleItem sets the thread interrupt, let's exit if there has 
been an interrupt set
           if (Thread.currentThread().isInterrupted()) {
             log.info("Kafka Consumer thread interrupted, shutting down Kafka 
consumer.");
             return false;
           }
-        } catch (MirroringException e) {
-          // We don't really know what to do here, so it's wiser to just break 
out.
-          log.error(
-              "Mirroring exception occurred while resubmitting to Kafka. We 
are going to stop the consumer thread now.",
-              e);
-          return false;
         } catch (WakeupException e) {
           log.info("Caught wakeup exception, shutting down 
KafkaSolrRequestConsumer.");
           return false;
         } catch (Exception e) {
-          // If there is any exception returned by handleItem, then reset the 
offset.
+          // If there is any exception returned by handleItem, don't set the 
offset.
 
           if (e instanceof ClassCastException || e instanceof 
SerializationException) { // TODO: optional
             log.error("Non retryable error", e);
             break;
           }
-          log.warn("Exception occurred in Kafka consumer thread, but we will 
continue.", e);
-          resetOffsetForPartition(partition, partitionRecords);
+          log.error("Exception occurred in Kafka consumer thread, stopping the 
Consumer.", e);
           break;
         }
       }
+
+      for (TopicPartition partition : partitionWorkMap.keySet()) {
+        checkForOffsetUpdates(partition);
+      }
+
     } catch (WakeupException e) {
       log.info("Caught wakeup exception, shutting down 
KafkaSolrRequestConsumer");
       return false;
@@ -220,14 +289,94 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
     return true;
   }
 
+  public void sendBatch(UpdateRequest solrReqBatch, 
ConsumerRecord<String,MirroredSolrRequest> lastRecord, WorkUnit workUnit) {
+    UpdateRequest finalSolrReqBatch = solrReqBatch;
+    Future<?> future = executor.submit(() -> {
+      IQueueHandler.Result<MirroredSolrRequest> result = 
messageProcessor.handleItem(new MirroredSolrRequest(finalSolrReqBatch));
+      try {
+        processResult(lastRecord, result);
+      } catch (MirroringException e) {
+        // We don't really know what to do here
+        log.error("Mirroring exception occurred while resubmitting to Kafka. 
We are going to stop the consumer thread now.", e);
+        throw new RuntimeException(e);
+      }
+    });
+    workUnit.workItems.add(future);
+  }
+
+  private void checkForOffsetUpdates(TopicPartition partition) {
+    PartitionWork work;
+    while ((work = partitionWorkMap.get(partition)) != null) {
+      WorkUnit workUnit = work.partitionQueue.peek();
+      if (workUnit != null) {
+        for (Future<?> future : workUnit.workItems) {
+          if (!future.isDone()) {
+            if (log.isTraceEnabled()) {
+              log.trace("Future for update is not done topic={}", 
partition.topic());
+            }
+            return;
+          }
+          if (log.isTraceEnabled()) {
+            log.trace("Future for update is done topic={}", partition.topic());
+          }
+          try {
+            future.get();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          } catch (ExecutionException e) {
+            log.error("Exception resubmitting updates to Kafka, stopping the 
Consumer thread", e);
+            work.partitionQueue.poll();
+            throw new RuntimeException("Exception resubmitting updates to 
Kafka, stopping the Consumer thread", e);
+          }
+          work.partitionQueue.poll();
+        }
+
+        updateOffset(partition, workUnit.nextOffset);
+
+      }
+    }
+  }
+
+  void processResult(ConsumerRecord<String,MirroredSolrRequest> record, 
IQueueHandler.Result<MirroredSolrRequest> result) throws MirroringException {
+    switch (result.status()) {
+      case FAILED_RESUBMIT:
+        if (log.isTraceEnabled()) {
+          log.trace("result=failed-resubmit");
+        }
+        metrics.counter("failed-resubmit").inc();
+        kafkaMirroringSink.submit(record.value());
+        break;
+      case HANDLED:
+        // no-op
+        if (log.isTraceEnabled()) {
+          log.trace("result=handled");
+        }
+        metrics.counter("handled").inc();
+        break;
+      case NOT_HANDLED_SHUTDOWN:
+        if (log.isTraceEnabled()) {
+          log.trace("result=nothandled_shutdown");
+        }
+        metrics.counter("nothandled_shutdown").inc();
+      case FAILED_RETRY:
+        log.error("Unexpected response while processing request. We never 
expect {}.", result.status().toString());
+        metrics.counter("failed-retry").inc();
+        break;
+      default:
+        if (log.isTraceEnabled()) {
+          log.trace("result=no matching case");
+        }
+        // no-op
+    }
+  }
+
   /**
    * Reset the local offset so that the consumer reads the records from Kafka 
again.
    *
    * @param partition        The TopicPartition to reset the offset for
    * @param partitionRecords PartitionRecords for the specified partition
    */
-  private void resetOffsetForPartition(TopicPartition partition,
-      List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
+  private void resetOffsetForPartition(TopicPartition partition, 
List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords) {
     if (log.isTraceEnabled()) {
       log.trace("Resetting offset to: {}", partitionRecords.get(0).offset());
     }
@@ -238,32 +387,56 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   /**
    * Logs and updates the commit point for the partition that has been 
processed.
    *
-   * @param partition        The TopicPartition to update the offset for
-   * @param partitionRecords PartitionRecords for the specified partition
+   * @param partition  The TopicPartition to update the offset for
+   * @param nextOffset The next offset to commit for this partition.
    */
-  private void updateOffset(TopicPartition partition,
-      List<ConsumerRecord<String, MirroredSolrRequest>> partitionRecords) {
-    long nextOffset = partitionRecords.get(partitionRecords.size() - 
1).offset() + 1;
-
+  private void updateOffset(TopicPartition partition, long nextOffset) {
     if (log.isTraceEnabled()) {
-      log.trace("Updated offset for topic={} partition={} to offset={}", 
partition.topic(),
-          partition.partition(), nextOffset);
+      log.trace("Updated offset for topic={} partition={} to offset={}", 
partition.topic(), partition.partition(), nextOffset);
     }
 
     consumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(nextOffset)));
   }
 
+  private static long 
getOffsetForPartition(List<ConsumerRecord<String,MirroredSolrRequest>> 
partitionRecords) {
+    long nextOffset = partitionRecords.get(partitionRecords.size() - 
1).offset() + 1;
+    return nextOffset;
+  }
+
   /**
    * Shutdown the Kafka consumer by calling wakeup.
    */
   public final void shutdown() {
+    consumer.wakeup();
     log.info("Shutdown called on KafkaCrossDcConsumer");
     try {
+      if (!executor.isShutdown()) {
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+      }
       solrClient.close();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      log.warn("Interrupted while waiting for executor to shutdown");
     } catch (Exception e) {
-      log.warn("Exception closing Solr client on shutdown");
+      log.warn("Exception closing Solr client on shutdown", e);
     }
-    consumer.wakeup();
   }
 
+  protected CloudSolrClient createSolrClient(KafkaCrossDcConf conf) {
+    return new 
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
 Optional.empty()).build();
+  }
+
+  protected KafkaMirroringSink createKafkaMirroringSink(KafkaCrossDcConf conf) 
{
+    return new KafkaMirroringSink(conf);
+  }
+
+  private static class PartitionWork {
+    private final Queue<WorkUnit> partitionQueue = new LinkedList<>();
+  }
+
+  static class WorkUnit {
+    Set<Future<?>> workItems = new HashSet<>();
+    long nextOffset;
+  }
 }
diff --git 
a/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
new file mode 100644
index 0000000..9ff2a8e
--- /dev/null
+++ 
b/crossdc-consumer/src/test/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumerTest.java
@@ -0,0 +1,151 @@
+package org.apache.solr.crossdc.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.crossdc.common.IQueueHandler;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.common.KafkaMirroringSink;
+import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyList;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class KafkaCrossDcConsumerTest {
+
+  private KafkaCrossDcConsumer kafkaCrossDcConsumer;
+  private KafkaConsumer<String,MirroredSolrRequest> kafkaConsumerMock;
+  private CloudSolrClient solrClientMock;
+  private KafkaMirroringSink kafkaMirroringSinkMock;
+
+  private SolrMessageProcessor messageProcessorMock;
+
+  @Before public void setUp() {
+    kafkaConsumerMock = mock(KafkaConsumer.class);
+    solrClientMock = mock(CloudSolrClient.class);
+    kafkaMirroringSinkMock = mock(KafkaMirroringSink.class);
+    messageProcessorMock = mock(SolrMessageProcessor.class);
+    KafkaCrossDcConf conf = testCrossDCConf();
+    // Set necessary configurations
+
+    kafkaCrossDcConsumer = new KafkaCrossDcConsumer(conf, new 
CountDownLatch(0)) {
+      @Override public KafkaConsumer<String,MirroredSolrRequest> 
createConsumer(Properties properties) {
+        return kafkaConsumerMock;
+      }
+
+      @Override protected CloudSolrClient createSolrClient(KafkaCrossDcConf 
conf) {
+        return solrClientMock;
+      }
+
+      @Override protected KafkaMirroringSink 
createKafkaMirroringSink(KafkaCrossDcConf conf) {
+        return kafkaMirroringSinkMock;
+      }
+    };
+  }
+
+  private static KafkaCrossDcConf testCrossDCConf() {
+    Map config = new HashMap<>();
+    config.put(KafkaCrossDcConf.TOPIC_NAME, "topic1");
+    config.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, "localhost:9092");
+    KafkaCrossDcConf conf = new KafkaCrossDcConf(config);
+    return conf;
+  }
+
+  @After public void tearDown() {
+    kafkaCrossDcConsumer.shutdown();
+  }
+
+  @Test public void testRunAndShutdown() throws Exception {
+    // Define the expected behavior of the mocks and set up the test scenario
+    when(kafkaConsumerMock.poll(any())).thenReturn(new 
ConsumerRecords<>(Collections.emptyMap()));
+
+    ExecutorService consumerThreadExecutor = 
Executors.newSingleThreadExecutor();
+
+    // Run the test
+    consumerThreadExecutor.submit(kafkaCrossDcConsumer);
+
+    // Run the shutdown method
+    kafkaCrossDcConsumer.shutdown();
+
+    // Verify that the consumer was subscribed with the correct topic names
+    verify(kafkaConsumerMock).subscribe(anyList());
+
+    // Verify that the appropriate methods were called on the mocks
+    verify(kafkaConsumerMock).wakeup();
+    verify(solrClientMock).close();
+
+    consumerThreadExecutor.shutdown();
+    consumerThreadExecutor.awaitTermination(10, TimeUnit.SECONDS);
+  }
+
+  @Test public void testHandleFailedResubmit() throws Exception {
+    // Set up the KafkaCrossDcConsumer
+    KafkaCrossDcConf testConf = testCrossDCConf();
+    KafkaCrossDcConsumer consumer = spy(new KafkaCrossDcConsumer(testConf, new 
CountDownLatch(0)));
+    doNothing().when(consumer).sendBatch(any(UpdateRequest.class), 
any(ConsumerRecord.class), any(KafkaCrossDcConsumer.WorkUnit.class));
+
+    // Set up the SolrMessageProcessor mock
+    SolrMessageProcessor mockMessageProcessor = 
mock(SolrMessageProcessor.class);
+    IQueueHandler.Result<MirroredSolrRequest> failedResubmitResult = new 
IQueueHandler.Result<>(IQueueHandler.ResultStatus.FAILED_RESUBMIT, null);
+    
when(mockMessageProcessor.handleItem(any(MirroredSolrRequest.class))).thenReturn(failedResubmitResult);
+
+    // Mock the KafkaMirroringSink
+    KafkaMirroringSink mockKafkaMirroringSink = mock(KafkaMirroringSink.class);
+    
doNothing().when(mockKafkaMirroringSink).submit(any(MirroredSolrRequest.class));
+    consumer.kafkaMirroringSink = mockKafkaMirroringSink;
+
+    // Call the method to test
+    ConsumerRecord<String,MirroredSolrRequest> record = 
createSampleConsumerRecord();
+    consumer.processResult(record, failedResubmitResult);
+
+    // Verify that the KafkaMirroringSink.submit() method was called
+    verify(consumer.kafkaMirroringSink, times(1)).submit(record.value());
+  }
+
+  private ConsumerRecord<String,MirroredSolrRequest> 
createSampleConsumerRecord() {
+    return new ConsumerRecord<>("sample-topic", 0, 0, "key", 
createSampleMirroredSolrRequest());
+  }
+
+  private ConsumerRecords<String,MirroredSolrRequest> 
createSampleConsumerRecords() {
+    TopicPartition topicPartition = new TopicPartition("sample-topic", 0);
+    List<ConsumerRecord<String,MirroredSolrRequest>> recordsList = new 
ArrayList<>();
+    recordsList.add(new ConsumerRecord<>("sample-topic", 0, 0, "key", 
createSampleMirroredSolrRequest()));
+    return new ConsumerRecords<>(Collections.singletonMap(topicPartition, 
recordsList));
+  }
+
+  private MirroredSolrRequest createSampleMirroredSolrRequest() {
+    // Create a sample MirroredSolrRequest for testing
+    SolrInputDocument solrInputDocument = new SolrInputDocument();
+    solrInputDocument.addField("id", "1");
+    solrInputDocument.addField("title", "Sample title");
+    solrInputDocument.addField("content", "Sample content");
+    UpdateRequest updateRequest = new UpdateRequest();
+    updateRequest.add(solrInputDocument);
+    return new MirroredSolrRequest(updateRequest);
+  }
+}
diff --git a/crossdc-producer/build.gradle b/crossdc-producer/build.gradle
index 4a989e4..9a6e5cd 100644
--- a/crossdc-producer/build.gradle
+++ b/crossdc-producer/build.gradle
@@ -36,15 +36,14 @@ sourceSets {
 dependencies {
     implementation project(':crossdc-consumer')
     implementation project(path: ':crossdc-commons', configuration: 'shadow')
+    testImplementation project(path: ':crossdc-commons')
 
     provided  group: 'org.apache.solr', name: 'solr-core', version: '8.11.2'
 
     testImplementation 'org.slf4j:slf4j-api'
     testImplementation 'org.hamcrest:hamcrest:2.2'
     testImplementation 'junit:junit:4.13.2'
-    testImplementation('org.mockito:mockito-core:4.3.1', {
-        exclude group: "net.bytebuddy", module: "byte-buddy-agent"
-    })
+    testImplementation('org.mockito:mockito-inline:4.3.1')
     testImplementation group: 'org.apache.solr', name: 'solr-core', version: 
'8.11.2'
     testImplementation group: 'org.apache.solr', name: 'solr-test-framework', 
version: '8.11.2'
 
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 74ee1eb..dc2e561 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -29,10 +29,10 @@ import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
 
+// review this code for bugs and performance improvements
 public class MirroringUpdateProcessor extends UpdateRequestProcessor {
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -42,14 +42,12 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
    * necessary to prevent circular mirroring between coupled cluster running 
this processor.
    */
   private final boolean doMirroring;
-  private final RequestMirroringHandler requestMirroringHandler;
+  final RequestMirroringHandler requestMirroringHandler;
 
   /**
    * The mirrored request starts as null, gets created and appended to at each 
process() call,
    * then submitted on finish().
    */
-  private UpdateRequest mirrorRequest;
-  private long mirrorRequestBytes;
   private final SolrParams mirrorParams;
 
 
@@ -57,7 +55,8 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
    * Controls whether docs exceeding the max-size (and thus cannot be 
mirrored) are indexed locally.
    */
   private final boolean indexUnmirrorableDocs;
-  private final long maxMirroringBatchSizeBytes;
+
+  private final long maxMirroringDocSizeBytes;
 
 
   /**
@@ -79,7 +78,7 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
     super(next);
     this.doMirroring = doMirroring;
     this.indexUnmirrorableDocs = indexUnmirrorableDocs;
-    this.maxMirroringBatchSizeBytes = maxMirroringBatchSizeBytes;
+    this.maxMirroringDocSizeBytes = maxMirroringBatchSizeBytes;
     this.mirrorParams = mirroredReqParams;
     this.distribPhase = distribPhase;
     this.requestMirroringHandler = requestMirroringHandler;
@@ -88,43 +87,47 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
 
   }
 
-  private UpdateRequest createAndOrGetMirrorRequest() {
-    if (mirrorRequest == null) {
-      mirrorRequest = new UpdateRequest();
+  UpdateRequest createMirrorRequest() {
+    UpdateRequest mirrorRequest = new UpdateRequest();
       mirrorRequest.setParams(new ModifiableSolrParams(mirrorParams));
-      mirrorRequestBytes = 0L;
-    }
-    if (log.isDebugEnabled())
-      log.debug("createOrGetMirrorRequest={}",
-          mirrorRequest);
-    return mirrorRequest;
+      return mirrorRequest;
   }
 
   @Override public void processAdd(final AddUpdateCommand cmd) throws 
IOException {
-
+    UpdateRequest mirrorRequest = createMirrorRequest();
     final SolrInputDocument doc = cmd.getSolrInputDocument().deepCopy();
     doc.removeField(CommonParams.VERSION_FIELD); // strip internal doc version
     final long estimatedDocSizeInBytes = ObjectSizeEstimator.estimate(doc);
-    final boolean tooLargeForKafka = estimatedDocSizeInBytes > 
maxMirroringBatchSizeBytes;
+    log.info("estimated doc size is {} bytes, max size is {}", 
estimatedDocSizeInBytes, maxMirroringDocSizeBytes);
+    final boolean tooLargeForKafka = estimatedDocSizeInBytes > 
maxMirroringDocSizeBytes;
     if (tooLargeForKafka && !indexUnmirrorableDocs) {
-      log.warn("Skipping indexing of doc {} as it exceeds the doc-size limit 
({} bytes) and is unmirrorable.", cmd.getPrintableId(), 
maxMirroringBatchSizeBytes);
-    } else {
-      super.processAdd(cmd); // let this throw to prevent mirroring invalid 
reqs
+      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Update 
exceeds the doc-size limit and is unmirrorable. id="
+
+          + cmd.getPrintableId() + " doc size=" + estimatedDocSizeInBytes + " 
maxDocSize=" + maxMirroringDocSizeBytes);
+    } else if (tooLargeForKafka) {
+      log.warn(
+          "Skipping mirroring of doc {} as it exceeds the doc-size limit ({} 
bytes) and is unmirrorable. doc size={}",
+          cmd.getPrintableId(), maxMirroringDocSizeBytes, 
estimatedDocSizeInBytes);
     }
 
+    super.processAdd(cmd); // let this throw to prevent mirroring invalid reqs
+
     // submit only from the leader shards so we mirror each doc once
     boolean isLeader = isLeader(cmd.getReq(),  cmd.getIndexedIdStr(), null, 
cmd.getSolrInputDocument());
-    if (doMirroring && isLeader) {
-      if (tooLargeForKafka) {
-        log.error("Skipping mirroring of doc {} because estimated size exceeds 
batch size limit {} bytes", cmd.getPrintableId(), maxMirroringBatchSizeBytes);
-      } else {
-        createAndOrGetMirrorRequest().add(doc, cmd.commitWithin, 
cmd.overwrite);
-        mirrorRequestBytes += estimatedDocSizeInBytes;
+    if (!tooLargeForKafka && doMirroring && isLeader) {
+
+      mirrorRequest.add(doc, cmd.commitWithin, cmd.overwrite);
+
+      try {
+        requestMirroringHandler.mirror(mirrorRequest);
+      } catch (Exception e) {
+        log.error("mirror submit failed", e);
+        throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
       }
     }
 
     if (log.isDebugEnabled())
-      log.debug("processAdd isLeader={} cmd={}", isLeader, cmd);
+      log.debug("processAdd isLeader={} doMirroring={} tooLargeForKafka={} 
cmd={}", isLeader, doMirroring, tooLargeForKafka, cmd);
   }
 
   @Override public void processDelete(final DeleteUpdateCommand cmd) throws 
IOException {
@@ -175,12 +178,21 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
 
     if (doMirroring) {
       boolean isLeader = false;
+      UpdateRequest mirrorRequest = createMirrorRequest();
       if (cmd.isDeleteById()) {
         // deleteById requests runs once per leader, so we just submit the 
request from the leader shard
         isLeader = isLeader(cmd.getReq(),  ((DeleteUpdateCommand)cmd).getId(), 
null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
             ShardParams._ROUTE_), null);
         if (isLeader) {
-          createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip 
versions from deletes
+
+          mirrorRequest.deleteById(cmd.getId()); // strip versions from deletes
+
+          try {
+            requestMirroringHandler.mirror(mirrorRequest);
+          } catch (Exception e) {
+            log.error("mirror submit failed", e);
+            throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+          }
         }
         if (log.isDebugEnabled())
           log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, 
isLeader, cmd);
@@ -191,7 +203,14 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
         // TODO: Can we actually support this considering DBQs aren't 
versioned.
 
         if (distribPhase == DistributedUpdateProcessor.DistribPhase.NONE) {
-          createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
+          mirrorRequest.deleteByQuery(cmd.query);
+
+          try {
+            requestMirroringHandler.mirror(mirrorRequest);
+          } catch (Exception e) {
+            log.error("mirror submit failed", e);
+            throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
+          }
         }
         if (log.isDebugEnabled())
           log.debug("processDelete doMirroring={} cmd={}", true, cmd);
@@ -214,7 +233,7 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
     }
   }
 
-  private boolean isLeader(SolrQueryRequest req, String id, String route, 
SolrInputDocument doc) {
+  boolean isLeader(SolrQueryRequest req, String id, String route, 
SolrInputDocument doc) {
     CloudDescriptor cloudDesc =
         req.getCore().getCoreDescriptor().getCloudDescriptor();
     String collection = cloudDesc.getCollectionName();
@@ -256,29 +275,6 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
 
   @Override public final void finish() throws IOException {
     super.finish();
-
-    if (doMirroring && mirrorRequest != null) {
-      // We are configured to mirror, but short-circuit on batches we already 
know will fail (because they cumulatively
-      // exceed the mirroring max-size)
-      if (mirrorRequestBytes > maxMirroringBatchSizeBytes) {
-        final String batchedIds = mirrorRequest.getDocuments().stream()
-                .map(doc -> doc.getField("id").getValue().toString())
-                .collect(Collectors.joining(", "));
-        log.warn("Mirroring skipped for request because batch size {} bytes 
exceeds limit {} bytes.  IDs: {}",
-                mirrorRequestBytes, maxMirroringBatchSizeBytes, batchedIds);
-        mirrorRequest = null;
-        mirrorRequestBytes = 0L;
-        return;
-      }
-
-      try {
-        requestMirroringHandler.mirror(mirrorRequest);
-        mirrorRequest = null; // so we don't accidentally submit it again
-      } catch (Exception e) {
-        log.error("mirror submit failed", e);
-        throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
-      }
-    }
   }
 
   // package private for testing
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 06f7dba..4f571cf 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -60,9 +60,6 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
         implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
 
     private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-    public static final NoOpUpdateRequestProcessor 
NO_OP_UPDATE_REQUEST_PROCESSOR =
-        new NoOpUpdateRequestProcessor();
-
     // Flag for mirroring requests
     public static final String SERVER_SHOULD_MIRROR = "shouldMirror";
 
@@ -71,7 +68,7 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
 
 
     private boolean enabled = true;
-    private boolean indexUnmirrorableDocs = false;
+
     private KafkaCrossDcConf conf;
 
     private final Map<String,Object> properties = new HashMap<>();
@@ -85,11 +82,6 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
             this.enabled = false;
         }
 
-        final Boolean indexUnmirrorableDocsArg = 
args.getBooleanArg("indexUnmirrorableDocs");
-        if (indexUnmirrorableDocsArg != null && indexUnmirrorableDocsArg) {
-            this.indexUnmirrorableDocs = true;
-        }
-
         for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
             String val = args._getStr(configKey.getKey(), null);
             if (val != null && !val.isBlank()) {
@@ -210,7 +202,7 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
                                                 final UpdateRequestProcessor 
next) {
 
         if (!enabled) {
-            return NO_OP_UPDATE_REQUEST_PROCESSOR;
+            return new NoOpUpdateRequestProcessor(next);
         }
 
         // if the class fails to initialize
@@ -221,6 +213,7 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
         // Check if mirroring is disabled in request params, defaults to true
         boolean doMirroring = req.getParams().getBool(SERVER_SHOULD_MIRROR, 
true);
         final long maxMirroringBatchSizeBytes = 
conf.getInt(MAX_REQUEST_SIZE_BYTES);
+        Boolean indexUnmirrorableDocs = conf.getBool(INDEX_UNMIRRORABLE_DOCS);
 
         ModifiableSolrParams mirroredParams = null;
         if (doMirroring) {
@@ -256,8 +249,8 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
     }
 
     private static class NoOpUpdateRequestProcessor extends 
UpdateRequestProcessor {
-        NoOpUpdateRequestProcessor() {
-            super(null);
+        NoOpUpdateRequestProcessor(UpdateRequestProcessor next) {
+            super(next);
         }
     }
 
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 7cba50a..ebdd879 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -14,6 +14,7 @@ import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrClient;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.BaseCloudSolrClient;
 import org.apache.solr.client.solrj.impl.BaseHttpSolrClient;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
@@ -27,10 +28,7 @@ import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.consumer.Consumer;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.sys.Prop;
@@ -44,6 +42,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
 import static org.mockito.Mockito.spy;
 
 @ThreadLeakFilters(defaultFilters = true, filters = { 
SolrIgnoredThreadsFilter.class,
@@ -53,8 +52,7 @@ import static org.mockito.Mockito.spy;
 
   private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private static final int MAX_MIRROR_BATCH_SIZE_BYTES = 
Integer.valueOf(DEFAULT_MAX_REQUEST_SIZE);
-  private static final int MAX_DOC_SIZE_BYTES = MAX_MIRROR_BATCH_SIZE_BYTES;
+  private static final int MAX_DOC_SIZE_BYTES = 
Integer.parseInt(DEFAULT_MAX_REQUEST_SIZE);
 
   static final String VERSION_FIELD = "_version_";
 
@@ -64,19 +62,24 @@ import static org.mockito.Mockito.spy;
   protected static volatile MiniSolrCloudCluster solrCluster1;
   protected static volatile MiniSolrCloudCluster solrCluster2;
 
-  protected static volatile Consumer consumer = new Consumer();
+  protected static volatile Consumer consumer;
 
   private static String TOPIC = "topic1";
 
   private static String COLLECTION = "collection1";
   private static String ALT_COLLECTION = "collection2";
+  private static Thread.UncaughtExceptionHandler uceh;
 
   @BeforeClass
   public static void beforeSolrAndKafkaIntegrationTest() throws Exception {
-
+    uceh = Thread.getDefaultUncaughtExceptionHandler();
+    Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+      log.error("Uncaught exception in thread " + t, e);
+    });
+    consumer = new Consumer();
     Properties config = new Properties();
-    config.put("unclean.leader.election.enable", "true");
-    config.put("enable.partition.eof", "false");
+    //config.put("unclean.leader.election.enable", "true");
+    //config.put("enable.partition.eof", "false");
 
     kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
       public String bootstrapServers() {
@@ -89,6 +92,7 @@ import static org.mockito.Mockito.spy;
 
     System.setProperty("topicName", TOPIC);
     System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+    System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
 
     solrCluster1 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
         
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
@@ -127,14 +131,6 @@ import static org.mockito.Mockito.spy;
   public static void afterSolrAndKafkaIntegrationTest() throws Exception {
     ObjectReleaseTracker.clear();
 
-    consumer.shutdown();
-
-    try {
-      kafkaCluster.stop();
-    } catch (Exception e) {
-      log.error("Exception stopping Kafka cluster", e);
-    }
-
     if (solrCluster1 != null) {
       solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster1.shutdown();
@@ -143,21 +139,19 @@ import static org.mockito.Mockito.spy;
       solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
       solrCluster2.shutdown();
     }
-  }
 
-  @After
-  public void tearDown() throws Exception {
-    super.tearDown();
-    solrCluster1.getSolrClient().deleteByQuery(COLLECTION, "*:*");
-    solrCluster2.getSolrClient().deleteByQuery(COLLECTION, "*:*");
-    solrCluster1.getSolrClient().commit(COLLECTION);
-    solrCluster2.getSolrClient().commit(COLLECTION);
+    consumer.shutdown();
+    consumer = null;
 
-    // Delete alternate collection in case it was created by any tests.
-    if 
(CollectionAdminRequest.listCollections(solrCluster1.getSolrClient()).contains(ALT_COLLECTION))
 {
-      
solrCluster1.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
-      
solrCluster2.getSolrClient().request(CollectionAdminRequest.deleteCollection(ALT_COLLECTION));
+    try {
+      kafkaCluster.deleteAllTopicsAndWait(5000);
+      kafkaCluster.stop();
+      kafkaCluster = null;
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
     }
+
+    Thread.setDefaultUncaughtExceptionHandler(uceh);
   }
 
   public void testFullCloudToCloud() throws Exception {
@@ -175,11 +169,18 @@ import static org.mockito.Mockito.spy;
     assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
   }
 
+  private static SolrInputDocument getDoc() {
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.nanoTime()));
+    doc.addField("text", "some test");
+    return doc;
+  }
+
   public void testProducerToCloud() throws Exception {
     Properties properties = new Properties();
     properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
     properties.put("acks", "all");
-    properties.put("retries", 0);
+    properties.put("retries", 1);
     properties.put("batch.size", 1);
     properties.put("buffer.memory", 33554432);
     properties.put("linger.ms", 1);
@@ -210,19 +211,25 @@ import static org.mockito.Mockito.spy;
   @Test
   public void testMirroringUpdateProcessor() throws Exception {
     final SolrInputDocument tooLargeDoc = new SolrInputDocument();
-    tooLargeDoc.addField("id", "tooLarge-" + 
String.valueOf(System.currentTimeMillis()));
+    tooLargeDoc.addField("id", System.nanoTime());
     tooLargeDoc.addField("text", new String(new byte[2 * MAX_DOC_SIZE_BYTES]));
     final SolrInputDocument normalDoc = new SolrInputDocument();
-    normalDoc.addField("id", "normalDoc-" + 
String.valueOf(System.currentTimeMillis()));
+    normalDoc.addField("id", System.nanoTime() + 22);
     normalDoc.addField("text", "Hello world");
-    final List<SolrInputDocument> docsToIndex = new ArrayList<>();
-    docsToIndex.add(tooLargeDoc);
+    List<SolrInputDocument> docsToIndex = new ArrayList<>();
     docsToIndex.add(normalDoc);
+    docsToIndex.add(tooLargeDoc);
 
     final CloudSolrClient cluster1Client = solrCluster1.getSolrClient();
-    cluster1Client.add(docsToIndex);
+    try {
+      cluster1Client.add(docsToIndex);
+    } catch (BaseCloudSolrClient.RouteException e) {
+      // expected
+    }
     cluster1Client.commit(COLLECTION);
 
+
+
     // Primary and secondary should each only index 'normalDoc'
     final String normalDocQuery = "id:" + normalDoc.get("id").getFirstValue();
     assertCluster2EventuallyHasDocs(COLLECTION, normalDocQuery, 1);
@@ -232,38 +239,42 @@ import static org.mockito.Mockito.spy;
 
     // Create new primary+secondary collection where 'tooLarge' docs ARE 
indexed on the primary
     CollectionAdminRequest.Create create =
-            CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 
1)
-                    .withProperty("indexUnmirrorableDocs", "true");
-    solrCluster1.getSolrClient().request(create);
-    solrCluster2.getSolrClient().request(create);
-    solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
-    solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
-
-    cluster1Client.add(ALT_COLLECTION, docsToIndex);
-    cluster1Client.commit(ALT_COLLECTION);
-
-    // Primary should have both 'normal' and 'large' docs; secondary should 
only have 'normal' doc.
-    assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 2);
-    assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
-    assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
-
-    // Index batch of docs that will exceed the max mirroring batch size 
cumulatively (but not individually)
-    // Batch consists of 100 docs each roughly 1/100th of the max-batch-size
-    docsToIndex.clear();
-    for (int i = 0; i < 100; i++) {
-      final SolrInputDocument doc = new SolrInputDocument();
-      doc.addField("id", "cumulativelyTooLarge-" + System.currentTimeMillis() 
+ "-" + i);
-      doc.addField("cumulativelyTooLarge_b", "true");
-      doc.addField("text", new String(new byte[MAX_MIRROR_BATCH_SIZE_BYTES / 
100]));
-      docsToIndex.add(doc);
+        CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1)
+            .withProperty("indexUnmirrorableDocs", "true");
+    try {
+      solrCluster1.getSolrClient().request(create);
+      solrCluster2.getSolrClient().request(create);
+      solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+      solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+      cluster1Client.add(ALT_COLLECTION, docsToIndex);
+      cluster1Client.commit(ALT_COLLECTION);
+
+      // try adding another doc
+//      final SolrInputDocument newDoc = new SolrInputDocument();
+//
+//      newDoc.addField("id", System.nanoTime());
+//      newDoc.addField("text", "Hello world");
+//      docsToIndex = new ArrayList<>();
+//      docsToIndex.add(newDoc);
+//
+//    try {
+//      cluster1Client.add(ALT_COLLECTION, docsToIndex);
+//    } catch (BaseCloudSolrClient.RouteException e) {
+//      // expected
+//    }
+//      cluster1Client.commit(ALT_COLLECTION);
+
+      // Primary should have both 'normal' and 'large' docs; secondary should 
only have 'normal' doc.
+      assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 2);
+      assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1);
+      assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1);
+    } finally {
+      CollectionAdminRequest.Delete delete =
+        CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+      solrCluster1.getSolrClient().request(delete);
+      solrCluster2.getSolrClient().request(delete);
     }
-    cluster1Client.add(ALT_COLLECTION, docsToIndex);
-    cluster1Client.commit(ALT_COLLECTION);
-
-    final String cumulativelyTooLargeQuery = "cumulativelyTooLarge_b:true";
-    // Primary (but not secondary) should have 100 additional docs
-    assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, 
cumulativelyTooLargeQuery, 100);
-    assertCluster2EventuallyHasDocs(ALT_COLLECTION, cumulativelyTooLargeQuery, 
0);
   }
 
   private void assertCluster2EventuallyHasDocs(String collection, String 
query, int expectedNumDocs) throws Exception {
@@ -283,13 +294,13 @@ import static org.mockito.Mockito.spy;
   private void assertClusterEventuallyHasDocs(SolrClient client, String 
collection, String query, int expectedNumDocs) throws Exception {
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < 20; i++) {
       client.commit(collection);
       results = client.query(collection, new SolrQuery(query));
       if (results.getResults().getNumFound() == expectedNumDocs) {
         foundUpdates = true;
       } else {
-        Thread.sleep(100);
+        Thread.sleep(200);
       }
     }
 
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
new file mode 100644
index 0000000..8027aae
--- /dev/null
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaMultiCollectionIntegrationTest.java
@@ -0,0 +1,222 @@
+package org.apache.solr.crossdc;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.lucene.util.QuickPatchThreadsFilter;
+import org.apache.solr.SolrIgnoredThreadsFilter;
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.cloud.MiniSolrCloudCluster;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
+import org.apache.solr.crossdc.consumer.Consumer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE;
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS;
+
+@ThreadLeakFilters(defaultFilters = true, filters = { 
SolrIgnoredThreadsFilter.class,
+    QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
+@ThreadLeakLingering(linger = 5000) public class 
SolrAndKafkaMultiCollectionIntegrationTest extends
+    SolrTestCaseJ4 {
+
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static final int MAX_DOC_SIZE_BYTES = 
Integer.parseInt(DEFAULT_MAX_REQUEST_SIZE);
+
+  static final String VERSION_FIELD = "_version_";
+
+  private static final int NUM_BROKERS = 1;
+  public EmbeddedKafkaCluster kafkaCluster;
+
+  protected volatile MiniSolrCloudCluster solrCluster1;
+  protected volatile MiniSolrCloudCluster solrCluster2;
+
+  protected static volatile Consumer consumer;
+
+  private static String TOPIC = "topic1";
+
+  private static String COLLECTION = "collection1";
+  private static String ALT_COLLECTION = "collection2";
+
+  @Before
+  public void beforeSolrAndKafkaIntegrationTest() throws Exception {
+    consumer = new Consumer();
+    Properties config = new Properties();
+    //config.put("unclean.leader.election.enable", "true");
+    //config.put("enable.partition.eof", "false");
+
+    kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
+      public String bootstrapServers() {
+        return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
+      }
+    };
+    kafkaCluster.start();
+
+    kafkaCluster.createTopic(TOPIC, 1, 1);
+
+    System.setProperty("topicName", TOPIC);
+    System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+    System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
+
+    solrCluster1 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
+        
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster1.getSolrClient().request(create);
+    solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
+
+    solrCluster2 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
+        
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
+
+    CollectionAdminRequest.Create create2 =
+        CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
+    solrCluster2.getSolrClient().request(create2);
+    solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
+
+    solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+
+    String bootstrapServers = kafkaCluster.bootstrapServers();
+    log.info("bootstrapServers={}", bootstrapServers);
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES, 
MAX_DOC_SIZE_BYTES);
+    consumer.start(properties);
+
+  }
+
+  @After
+  public void afterSolrAndKafkaIntegrationTest() throws Exception {
+    ObjectReleaseTracker.clear();
+
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+
+    consumer.shutdown();
+    consumer = null;
+
+    try {
+      //kafkaCluster.deleteAllTopicsAndWait(10000);
+      kafkaCluster.stop();
+      kafkaCluster = null;
+    } catch (Exception e) {
+      log.error("Exception stopping Kafka cluster", e);
+    }
+
+
+  }
+
+  private static SolrInputDocument getDoc() {
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.nanoTime()));
+    doc.addField("text", "some test");
+    return doc;
+  }
+
+  private void assertCluster2EventuallyHasDocs(String collection, String 
query, int expectedNumDocs) throws Exception {
+    assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, 
query, expectedNumDocs);
+  }
+
+  private void createCollection(CloudSolrClient client, 
CollectionAdminRequest.Create createCmd) throws Exception {
+    final String stashedDefault = client.getDefaultCollection();
+    try {
+      //client.setDefaultCollection(null);
+      client.request(createCmd);
+    } finally {
+      //client.setDefaultCollection(stashedDefault);
+    }
+  }
+
+  @Test
+  public void testFullCloudToCloudMultiCollection() throws Exception {
+    CollectionAdminRequest.Create create =
+            CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 
1);
+
+    try {
+      solrCluster1.getSolrClient().request(create);
+      solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+      solrCluster2.getSolrClient().request(create);
+      solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1);
+
+
+      CloudSolrClient client = solrCluster1.getSolrClient();
+
+      SolrInputDocument doc1 = getDoc();
+      SolrInputDocument doc2 = getDoc();
+      SolrInputDocument doc3 = getDoc();
+      SolrInputDocument doc4 = getDoc();
+      SolrInputDocument doc5 = getDoc();
+      SolrInputDocument doc6 = getDoc();
+      SolrInputDocument doc7 = getDoc();
+
+      client.add(COLLECTION, doc1);
+      client.add(ALT_COLLECTION, doc2);
+      client.add(COLLECTION, doc3);
+      client.add(COLLECTION, doc4);
+      client.add(ALT_COLLECTION, doc5);
+      client.add(ALT_COLLECTION, doc6);
+      client.add(COLLECTION, doc7);
+
+      client.commit(COLLECTION);
+      client.commit(ALT_COLLECTION);
+
+      System.out.println("Sent producer record");
+
+      assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 3);
+      assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 4);
+
+    } finally {
+      CollectionAdminRequest.Delete delete =
+              CollectionAdminRequest.deleteCollection(ALT_COLLECTION);
+      solrCluster1.getSolrClient().request(delete);
+      solrCluster2.getSolrClient().request(delete);
+    }
+  }
+
+
+  private void assertClusterEventuallyHasDocs(SolrClient client, String 
collection, String query, int expectedNumDocs) throws Exception {
+    QueryResponse results = null;
+    boolean foundUpdates = false;
+    for (int i = 0; i < 100; i++) {
+      client.commit(collection);
+      results = client.query(collection, new SolrQuery(query));
+      if (results.getResults().getNumFound() == expectedNumDocs) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(200);
+      }
+    }
+
+    assertTrue("results=" + results, foundUpdates);
+  }
+}
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index e25ac83..927c5ab 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -105,6 +105,15 @@ import java.util.*;
   public static void afterSolrAndKafkaIntegrationTest() throws Exception {
     ObjectReleaseTracker.clear();
 
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+
     consumer.shutdown();
 
     try {
@@ -115,14 +124,6 @@ import java.util.*;
       log.error("Exception stopping Kafka cluster", e);
     }
 
-    if (solrCluster1 != null) {
-      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
-      solrCluster1.shutdown();
-    }
-    if (solrCluster2 != null) {
-      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
-      solrCluster2.shutdown();
-    }
   }
 
   @After
@@ -141,7 +142,7 @@ import java.util.*;
 
     QueryResponse results = null;
     boolean foundUpdates = false;
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("*:*"));
@@ -160,17 +161,18 @@ import java.util.*;
     assertEquals("results=" + results1, 7, 
results1.getResults().getNumFound());
     assertEquals("results=" + results2, 7, 
results2.getResults().getNumFound());
 
+    System.out.println("adding second docs");
     addDocs(client, "second");
 
     foundUpdates = false;
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
-      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("*:*"));
+      solrCluster1.getSolrClient().query(COLLECTION, new 
SolrQuery("text:second"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("text:second"));
       if (results.getResults().getNumFound() == 7) {
         foundUpdates = true;
       } else {
-        Thread.sleep(100);
+        Thread.sleep(200);
       }
     }
 
@@ -179,8 +181,8 @@ import java.util.*;
     assertTrue("results=" + results, foundUpdates);
     System.out.println("Rest: " + results);
 
-    results1 =solrCluster1.getSolrClient().query(COLLECTION, new 
SolrQuery("second"));
-    results2 = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("second"));
+    results1 =solrCluster1.getSolrClient().query(COLLECTION, new 
SolrQuery("text:second"));
+    results2 = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("text:second"));
 
     assertEquals("results=" + results1, 7, 
results1.getResults().getNumFound());
     assertEquals("results=" + results2, 7, 
results2.getResults().getNumFound());
@@ -188,10 +190,10 @@ import java.util.*;
     addDocs(client, "third");
 
     foundUpdates = false;
-    for (int i = 0; i < 1000; i++) {
+    for (int i = 0; i < 100; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
-      solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
-      results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("*:*"));
+      solrCluster1.getSolrClient().query(COLLECTION, new 
SolrQuery("text:third"));
+      results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("text:third"));
       if (results.getResults().getNumFound() == 7) {
         foundUpdates = true;
       } else {
@@ -204,8 +206,8 @@ import java.util.*;
     assertTrue("results=" + results, foundUpdates);
     System.out.println("Rest: " + results);
 
-    results1 =solrCluster1.getSolrClient().query(COLLECTION, new 
SolrQuery("third"));
-    results2 = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("third"));
+    results1 =solrCluster1.getSolrClient().query(COLLECTION, new 
SolrQuery("text:third"));
+    results2 = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("text:third"));
 
     assertEquals("results=" + results1, 7, 
results1.getResults().getNumFound());
     assertEquals("results=" + results2, 7, 
results2.getResults().getNumFound());
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index f6c8844..991f67e 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -135,6 +135,15 @@ import java.util.Properties;
   public static void afterSolrAndKafkaIntegrationTest() throws Exception {
     ObjectReleaseTracker.clear();
 
+    if (solrCluster1 != null) {
+      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster1.shutdown();
+    }
+    if (solrCluster2 != null) {
+      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
+      solrCluster2.shutdown();
+    }
+
     consumer1.shutdown();
     consumer2.shutdown();
 
@@ -144,14 +153,6 @@ import java.util.Properties;
       log.error("Exception stopping Kafka cluster", e);
     }
 
-    if (solrCluster1 != null) {
-      solrCluster1.getZkServer().getZkClient().printLayoutToStdOut();
-      solrCluster1.shutdown();
-    }
-    if (solrCluster2 != null) {
-      solrCluster2.getZkServer().getZkClient().printLayoutToStdOut();
-      solrCluster2.shutdown();
-    }
   }
 
   @After
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
new file mode 100644
index 0000000..d21e410
--- /dev/null
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/update/processor/MirroringUpdateProcessorTest.java
@@ -0,0 +1,177 @@
+package org.apache.solr.update.processor;
+
+import org.apache.solr.SolrTestCaseJ4;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
+import org.apache.solr.cloud.CloudDescriptor;
+import org.apache.solr.cloud.ZkController;
+import org.apache.solr.common.SolrException;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.cloud.DocRouter;
+import org.apache.solr.common.cloud.Replica;
+import org.apache.solr.common.cloud.Slice;
+import org.apache.solr.common.cloud.ZkStateReader;
+import org.apache.solr.common.params.ModifiableSolrParams;
+import org.apache.solr.common.params.SolrParams;
+import org.apache.solr.core.CoreContainer;
+import org.apache.solr.core.CoreDescriptor;
+import org.apache.solr.core.SolrCore;
+import org.apache.solr.request.SolrQueryRequest;
+import org.apache.solr.request.SolrQueryRequestBase;
+import org.apache.solr.schema.IndexSchema;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.processor.DistributedUpdateProcessor;
+import org.apache.solr.update.processor.MirroringUpdateProcessor;
+import org.apache.solr.update.processor.RequestMirroringHandler;
+import org.apache.solr.update.processor.UpdateRequestProcessor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+public class MirroringUpdateProcessorTest extends SolrTestCaseJ4 {
+
+  private UpdateRequestProcessor next;
+  private MirroringUpdateProcessor processor;
+  private RequestMirroringHandler requestMirroringHandler;
+  private AddUpdateCommand addUpdateCommand;
+  private DeleteUpdateCommand deleteUpdateCommand;
+  private SolrQueryRequestBase req;
+  UpdateRequest requestMock;
+  private UpdateRequestProcessor nextProcessor;
+  private SolrCore core;
+  private HttpSolrClient.Builder builder = 
Mockito.mock(HttpSolrClient.Builder.class);
+  private HttpSolrClient client = Mockito.mock(HttpSolrClient.class);
+  private CloudDescriptor cloudDesc;
+
+  @Before public void setUp() throws Exception {
+    super.setUp();
+    addUpdateCommand = new AddUpdateCommand(req);
+    addUpdateCommand.solrDoc = new SolrInputDocument();
+    addUpdateCommand.solrDoc.addField("id", "test");
+    req = Mockito.mock(SolrQueryRequestBase.class);
+    Mockito.when(req.getParams()).thenReturn(new ModifiableSolrParams());
+
+    requestMock = Mockito.mock(UpdateRequest.class);
+    addUpdateCommand.setReq(req);
+
+    nextProcessor = Mockito.mock(UpdateRequestProcessor.class);
+
+    IndexSchema schema = Mockito.mock(IndexSchema.class);
+    Mockito.when(req.getSchema()).thenReturn(schema);
+
+    deleteUpdateCommand = new DeleteUpdateCommand(req);
+    deleteUpdateCommand.query = "*:*";
+
+    next = Mockito.mock(UpdateRequestProcessor.class);
+    requestMirroringHandler = Mockito.mock(RequestMirroringHandler.class);
+    processor = new MirroringUpdateProcessor(next, true, true, 1000L, new 
ModifiableSolrParams(), DistributedUpdateProcessor.DistribPhase.NONE,
+        requestMirroringHandler) {
+      UpdateRequest createMirrorRequest() {
+        return requestMock;
+      }
+    };
+
+    core = Mockito.mock(SolrCore.class);
+    CoreDescriptor coreDesc = Mockito.mock(CoreDescriptor.class);
+    cloudDesc = Mockito.mock(CloudDescriptor.class);
+    CoreContainer coreContainer = Mockito.mock(CoreContainer.class);
+    ZkController zkController = Mockito.mock(ZkController.class);
+    ClusterState clusterState = Mockito.mock(ClusterState.class);
+    DocCollection docCollection = Mockito.mock(DocCollection.class);
+    DocRouter docRouter = Mockito.mock(DocRouter.class);
+    Slice slice = Mockito.mock(Slice.class);
+    ZkStateReader zkStateReader = Mockito.mock(ZkStateReader.class);
+    Replica replica = Mockito.mock(Replica.class);
+
+    Mockito.when(replica.getName()).thenReturn("replica1");
+    Mockito.when(zkStateReader.getLeaderRetry(Mockito.any(), 
Mockito.any())).thenReturn(replica);
+    Mockito.when(zkController.getZkStateReader()).thenReturn(zkStateReader);
+    Mockito.when(coreDesc.getCloudDescriptor()).thenReturn(cloudDesc);
+    
Mockito.when(clusterState.getCollection(Mockito.any())).thenReturn(docCollection);
+    Mockito.when(docCollection.getRouter()).thenReturn(docRouter);
+    Mockito.when(docRouter.getTargetSlice(Mockito.any(), Mockito.any(), 
Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(slice);
+    Mockito.when(zkController.getClusterState()).thenReturn(clusterState);
+    Mockito.when(coreContainer.getZkController()).thenReturn(zkController);
+    Mockito.when(core.getCoreContainer()).thenReturn(coreContainer);
+    Mockito.when(core.getCoreDescriptor()).thenReturn(coreDesc);
+    Mockito.when(req.getCore()).thenReturn(core);
+  }
+
+  @Test public void testProcessAddWithinLimit() throws Exception {
+    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", "1");
+    AddUpdateCommand cmd = new AddUpdateCommand(req);
+    cmd.solrDoc = doc;
+    cmd.commitWithin = 1000;
+    cmd.overwrite = true;
+    processor.processAdd(cmd);
+    Mockito.verify(next).processAdd(cmd);
+    Mockito.verify(requestMirroringHandler).mirror(requestMock);
+  }
+
+  @Test public void testProcessAddExceedsLimit() {
+    AddUpdateCommand addUpdateCommand = new AddUpdateCommand(req);
+    SolrInputDocument solrInputDocument = new SolrInputDocument();
+    solrInputDocument.addField("id", "123");
+    solrInputDocument.addField("large_field", "Test ".repeat(10000));
+    addUpdateCommand.solrDoc = solrInputDocument;
+
+    Mockito.when(req.getCore()).thenReturn(core);
+    
Mockito.when(req.getCore().getCoreDescriptor()).thenReturn(Mockito.mock(CoreDescriptor.class));
+    
Mockito.when(req.getCore().getCoreDescriptor().getCloudDescriptor()).thenReturn(Mockito.mock(CloudDescriptor.class));
+    
Mockito.when(req.getCore().getCoreContainer()).thenReturn(Mockito.mock(CoreContainer.class));
+    
Mockito.when(req.getCore().getCoreContainer().getZkController()).thenReturn(Mockito.mock(ZkController.class));
+    
Mockito.when(req.getCore().getCoreContainer().getZkController().getClusterState()).thenReturn(Mockito.mock(ClusterState.class));
+
+    SolrParams mirrorParams = new ModifiableSolrParams();
+    MirroringUpdateProcessor mirroringUpdateProcessorWithLimit = new 
MirroringUpdateProcessor(nextProcessor, true, false, // indexUnmirrorableDocs 
set to false
+        50000, mirrorParams, DistributedUpdateProcessor.DistribPhase.NONE, 
requestMirroringHandler);
+
+    assertThrows(SolrException.class, () -> 
mirroringUpdateProcessorWithLimit.processAdd(addUpdateCommand));
+  }
+
+  @Test public void testProcessAddLeader() throws Exception {
+    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+    processor.processAdd(addUpdateCommand);
+    Mockito.verify(requestMirroringHandler, 
Mockito.times(1)).mirror(Mockito.any());
+  }
+
+  @Test public void testProcessAddNotLeader() throws Exception {
+    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica2");
+    processor.processAdd(addUpdateCommand);
+    Mockito.verify(requestMirroringHandler, 
Mockito.times(0)).mirror(Mockito.any());
+  }
+
+  @Test public void testProcessDelete() throws Exception {
+    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+    processor.processDelete(deleteUpdateCommand);
+    Mockito.verify(requestMirroringHandler, 
Mockito.times(1)).mirror(Mockito.any());
+  }
+
+  @Test public void testProcessDBQResults() throws Exception {
+    Mockito.when(cloudDesc.getCoreNodeName()).thenReturn("replica1");
+    Mockito.when(builder.build()).thenReturn(client);
+    SolrInputDocument doc = new SolrInputDocument();
+    doc.addField("id", "test");
+    addUpdateCommand.solrDoc = doc;
+    processor.processAdd(addUpdateCommand);
+
+    SolrQuery query = new SolrQuery();
+    query.setQuery("*:*");
+    query.setRows(1000);
+    query.setSort("id", SolrQuery.ORDER.asc);
+
+    processor.processDelete(deleteUpdateCommand);
+  }
+
+  @Test public void testFinish() throws IOException {
+    processor.finish();
+  }
+}
\ No newline at end of file
diff --git a/crossdc-producer/src/test/resources/log4j2.xml 
b/crossdc-producer/src/test/resources/log4j2.xml
index 98c24fc..754c324 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -59,11 +59,11 @@
     <Logger name="org.apache.solr.hadoop" level="INFO"/>
     <Logger name="org.eclipse.jetty" level="INFO"/>
 
-    <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" 
level="INFO"/>
-    <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" 
level="INFO"/>
-    <Logger 
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" 
level="INFO"/>
-    <Logger 
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" 
level="INFO"/>
-    <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" 
level="INFO"/>
+    <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" 
level="TRACE"/>
+    <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" 
level="TRACE"/>
+    <Logger 
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" 
level="TRACE"/>
+    <Logger 
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" 
level="TRACE"/>
+    <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" 
level="TRACE"/>
 
 
     <Root level="INFO">
diff --git a/gradlew b/gradlew
old mode 100755
new mode 100644

Reply via email to