This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 8f1e903 Beef up config override test, additional config logging. (#42)
8f1e903 is described below
commit 8f1e9030e93f2bbb31cacc7bd6ddfe3497a309ab
Author: Mark Robert Miller <[email protected]>
AuthorDate: Thu Sep 15 16:45:49 2022 -0500
Beef up config override test, additional config logging. (#42)
---
.../solr/crossdc/common/KafkaMirroringSink.java | 3 +-
.../org/apache/solr/crossdc/consumer/Consumer.java | 2 +
.../MirroringUpdateRequestProcessorFactory.java | 7 +-
.../solr/crossdc/ZkConfigIntegrationTest.java | 77 +++++++++++++++++-----
4 files changed, 67 insertions(+), 22 deletions(-)
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index a1cd2a0..df48c60 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -43,7 +43,6 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
// Create Kafka Mirroring Sink
this.conf = conf;
this.producer = initProducer();
- log.info("KafkaMirroringSink has been created. Producer & Topic have
been created successfully! Configurations {}", conf);
}
@Override
@@ -91,7 +90,7 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
// Initialize and return Kafka producer
Properties kafkaProducerProps = new Properties();
- log.info("Creating Kafka producer! Configurations {} ",
conf.toString());
+ log.info("Starting CrossDC Producer {}", conf);
kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index 122205f..f20dacd 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -60,6 +60,8 @@ public class Consumer {
}
}
+ log.info("Consumer startup config properties before adding additional
properties from Zookeeper={}", properties);
+
String zkConnectString = (String)
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
if (zkConnectString == null || zkConnectString.isBlank()) {
throw new IllegalArgumentException("zkConnectString not specified
for Consumer");
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 207f0a6..654be15 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -131,6 +131,9 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
if (!enabled) {
return;
}
+
+ log.info("Producer startup config properties before adding additional
properties from Zookeeper={}", properties);
+
Properties zkProps = null;
try {
if (core.getCoreContainer().getZkController()
@@ -173,12 +176,12 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"topicName not specified for producer");
}
- log.info("bootstrapServers={} topicName={}",
properties.get(BOOTSTRAP_SERVERS), properties.get(TOPIC_NAME));
-
// load the request mirroring sink class and instantiate.
// mirroringHandler =
core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(),
KafkaRequestMirroringHandler.class);
KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+
+
KafkaMirroringSink sink = new KafkaMirroringSink(conf);
Closer closer = new Closer(sink);
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index bcb28c2..f6c8844 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -2,10 +2,6 @@ package org.apache.solr.crossdc;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.lucene.util.QuickPatchThreadsFilter;
import org.apache.solr.SolrIgnoredThreadsFilter;
@@ -13,15 +9,12 @@ import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
-import org.apache.solr.crossdc.common.MirroredSolrRequest;
-import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.consumer.Consumer;
import org.junit.After;
import org.junit.AfterClass;
@@ -29,7 +22,6 @@ import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
@@ -51,9 +43,11 @@ import java.util.Properties;
protected static volatile MiniSolrCloudCluster solrCluster1;
protected static volatile MiniSolrCloudCluster solrCluster2;
- protected static volatile Consumer consumer = new Consumer();
+ protected static volatile Consumer consumer1 = new Consumer();
+ protected static volatile Consumer consumer2 = new Consumer();
- private static String TOPIC = "topic1";
+ private static String TOPIC1 = "topicSrc";
+ private static String TOPIC2 = "topicDst";
private static String COLLECTION = "collection1";
@@ -71,7 +65,8 @@ import java.util.Properties;
};
kafkaCluster.start();
- kafkaCluster.createTopic(TOPIC, 1, 1);
+ kafkaCluster.createTopic(TOPIC1, 1, 1);
+ kafkaCluster.createTopic(TOPIC2, 1, 1);
// System.setProperty("topicName", null);
// System.setProperty("bootstrapServers", null);
@@ -81,11 +76,11 @@ import java.util.Properties;
solrCluster1 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
- props.setProperty(KafkaCrossDcConf.TOPIC_NAME, "bad_topic");
+ props.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS,
kafkaCluster.bootstrapServers());
- System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
-
+ System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
+ System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS,
kafkaCluster.bootstrapServers());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
props.store(baos, "");
@@ -102,7 +97,6 @@ import java.util.Properties;
solrCluster2 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
-
solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties",
data, true);
CollectionAdminRequest.Create create2 =
CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
@@ -111,21 +105,38 @@ import java.util.Properties;
solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
+ props = new Properties();
+ props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS,
kafkaCluster.bootstrapServers());
+
+
+ baos = new ByteArrayOutputStream();
+ props.store(baos, "");
+ data = baos.toByteArray();
+
solrCluster2.getSolrClient().getZkStateReader().getZkClient().makePath("/crossdc.properties",
data, true);
+
+
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
Map<String, Object> properties = new HashMap<>();
- properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ Object put = properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+
+ System.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS,
kafkaCluster.bootstrapServers());
- consumer.start(properties);
+ consumer1.start(properties);
+ System.setProperty(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster1.getZkServer().getZkAddress());
+ System.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC2);
+ System.setProperty("port", "8383");
+ consumer2.start();
}
@AfterClass
public static void afterSolrAndKafkaIntegrationTest() throws Exception {
ObjectReleaseTracker.clear();
- consumer.shutdown();
+ consumer1.shutdown();
+ consumer2.shutdown();
try {
kafkaCluster.stop();
@@ -182,6 +193,36 @@ import java.util.Properties;
assertTrue("results=" + results, foundUpdates);
System.out.println("Rest: " + results);
+
+
+ client = solrCluster2.getSolrClient();
+ doc = new SolrInputDocument();
+ doc.addField("id", String.valueOf(System.currentTimeMillis()));
+ doc.addField("text", "some test2");
+
+ client.add(doc);
+
+ client.commit(COLLECTION);
+
+ System.out.println("Sent producer record");
+
+ results = null;
+ foundUpdates = false;
+ for (int i = 0; i < 100; i++) {
+ solrCluster1.getSolrClient().commit(COLLECTION);
+ solrCluster2.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
+ results = solrCluster1.getSolrClient().query(COLLECTION, new
SolrQuery("*:*"));
+ if (results.getResults().getNumFound() == 1) {
+ foundUpdates = true;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
+ System.out.println("Closed producer");
+
+ assertTrue("results=" + results, foundUpdates);
+ System.out.println("Rest: " + results);
}
}