This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 8f1e903  Beef up config override test, additional config logging. (#42)
8f1e903 is described below

commit 8f1e9030e93f2bbb31cacc7bd6ddfe3497a309ab
Author: Mark Robert Miller <[email protected]>
AuthorDate: Thu Sep 15 16:45:49 2022 -0500

    Beef up config override test, additional config logging. (#42)
---
 .../solr/crossdc/common/KafkaMirroringSink.java    |  3 +-
 .../org/apache/solr/crossdc/consumer/Consumer.java |  2 +
 .../MirroringUpdateRequestProcessorFactory.java    |  7 +-
 .../solr/crossdc/ZkConfigIntegrationTest.java      | 77 +++++++++++++++++-----
 4 files changed, 67 insertions(+), 22 deletions(-)

diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index a1cd2a0..df48c60 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -43,7 +43,6 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
         // Create Kafka Mirroring Sink
         this.conf = conf;
         this.producer = initProducer();
-        log.info("KafkaMirroringSink has been created. Producer & Topic have 
been created successfully! Configurations {}", conf);
     }
 
     @Override
@@ -91,7 +90,7 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
         // Initialize and return Kafka producer
         Properties kafkaProducerProps = new Properties();
 
-        log.info("Creating Kafka producer! Configurations {} ", 
conf.toString());
+        log.info("Starting CrossDC Producer {}", conf);
 
         kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
 
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 122205f..f20dacd 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -60,6 +60,8 @@ public class Consumer {
             }
         }
 
+        log.info("Consumer startup config properties before adding additional 
properties from Zookeeper={}", properties);
+
         String zkConnectString = (String) 
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
         if (zkConnectString == null || zkConnectString.isBlank()) {
             throw new IllegalArgumentException("zkConnectString not specified 
for Consumer");
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 207f0a6..654be15 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -131,6 +131,9 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
         if (!enabled) {
             return;
         }
+
+        log.info("Producer startup config properties before adding additional 
properties from Zookeeper={}", properties);
+
         Properties zkProps = null;
         try {
             if (core.getCoreContainer().getZkController()
@@ -173,12 +176,12 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"topicName not specified for producer");
         }
 
-        log.info("bootstrapServers={} topicName={}", 
properties.get(BOOTSTRAP_SERVERS), properties.get(TOPIC_NAME));
-
         // load the request mirroring sink class and instantiate.
        // mirroringHandler = 
core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), 
KafkaRequestMirroringHandler.class);
 
         KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+
+
         KafkaMirroringSink sink = new KafkaMirroringSink(conf);
 
         Closer closer = new Closer(sink);
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index bcb28c2..f6c8844 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -2,10 +2,6 @@ package org.apache.solr.crossdc;
 
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.lucene.util.QuickPatchThreadsFilter;
 import org.apache.solr.SolrIgnoredThreadsFilter;
@@ -13,15 +9,12 @@ import org.apache.solr.SolrTestCaseJ4;
 import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
 import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.consumer.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -29,7 +22,6 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
@@ -51,9 +43,11 @@ import java.util.Properties;
   protected static volatile MiniSolrCloudCluster solrCluster1;
   protected static volatile MiniSolrCloudCluster solrCluster2;
 
-  protected static volatile Consumer consumer = new Consumer();
+  protected static volatile Consumer consumer1 = new Consumer();
+  protected static volatile Consumer consumer2 = new Consumer();
 
-  private static String TOPIC = "topic1";
+  private static String TOPIC1 = "topicSrc";
+  private static String TOPIC2 = "topicDst";
 
   private static String COLLECTION = "collection1";
 
@@ -71,7 +65,8 @@ import java.util.Properties;
     };
     kafkaCluster.start();
 
-    kafkaCluster.createTopic(TOPIC, 1, 1);
+    kafkaCluster.createTopic(TOPIC1, 1, 1);
+    kafkaCluster.createTopic(TOPIC2, 1, 1);
 
     // System.setProperty("topicName", null);
     // System.setProperty("bootstrapServers", null);
@@ -81,11 +76,11 @@ import java.util.Properties;
     solrCluster1 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
         
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
-    props.setProperty(KafkaCrossDcConf.TOPIC_NAME, "bad_topic");
+    props.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
     props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, 
kafkaCluster.bootstrapServers());
 
-    System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
-
+    System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, 
kafkaCluster.bootstrapServers());
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     props.store(baos, "");
@@ -102,7 +97,6 @@ import java.util.Properties;
     solrCluster2 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
         
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
-    
solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties",
 data, true);
 
     CollectionAdminRequest.Create create2 =
         CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
@@ -111,21 +105,38 @@ import java.util.Properties;
 
     solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
 
+    props = new Properties();
+    props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, 
kafkaCluster.bootstrapServers());
+
+
+    baos = new ByteArrayOutputStream();
+    props.store(baos, "");
+    data = baos.toByteArray();
+    
solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties",
 data, true);
+
+
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
     Map<String, Object> properties = new HashMap<>();
-    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    Object put = properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+
+    System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, 
kafkaCluster.bootstrapServers());
 
-    consumer.start(properties);
+    consumer1.start(properties);
 
+    System.setProperty(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster1.getZkServer().getZkAddress());
+    System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
+    System.setProperty("port", "8383");
+    consumer2.start();
   }
 
   @AfterClass
   public static void afterSolrAndKafkaIntegrationTest() throws Exception {
     ObjectReleaseTracker.clear();
 
-    consumer.shutdown();
+    consumer1.shutdown();
+    consumer2.shutdown();
 
     try {
       kafkaCluster.stop();
@@ -182,6 +193,36 @@ import java.util.Properties;
     assertTrue("results=" + results, foundUpdates);
     System.out.println("Rest: " + results);
 
+
+
+    client = solrCluster2.getSolrClient();
+    doc = new SolrInputDocument();
+    doc.addField("id", String.valueOf(System.currentTimeMillis()));
+    doc.addField("text", "some test2");
+
+    client.add(doc);
+
+    client.commit(COLLECTION);
+
+    System.out.println("Sent producer record");
+
+    results = null;
+    foundUpdates = false;
+    for (int i = 0; i < 100; i++) {
+      solrCluster1.getSolrClient().commit(COLLECTION);
+      solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+      results = solrCluster1.getSolrClient().query(COLLECTION, new 
SolrQuery("*:*"));
+      if (results.getResults().getNumFound() == 1) {
+        foundUpdates = true;
+      } else {
+        Thread.sleep(100);
+      }
+    }
+
+    System.out.println("Closed producer");
+
+    assertTrue("results=" + results, foundUpdates);
+    System.out.println("Rest: " + results);
   }
 
 }

Reply via email to