This is an automated email from the ASF dual-hosted git repository.
anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new ddfab4b Configuration improvements, cleanup, minor fixes. (#35)
ddfab4b is described below
commit ddfab4b953897a94f946b558539057cc7e64b23c
Author: Mark Robert Miller <[email protected]>
AuthorDate: Tue Sep 13 14:14:13 2022 -0500
Configuration improvements, cleanup, minor fixes. (#35)
---
.../apache/solr/crossdc/common/ConfigProperty.java | 67 ++++++
.../apache/solr/crossdc/common/CrossDcConf.java | 2 -
.../solr/crossdc/common/KafkaCrossDcConf.java | 257 ++++++++++++---------
.../solr/crossdc/common/KafkaMirroringSink.java | 55 +++--
.../org/apache/solr/crossdc/consumer/Consumer.java | 174 ++++----------
.../crossdc/consumer/KafkaCrossDcConsumer.java | 32 +--
.../MirroringUpdateRequestProcessorFactory.java | 101 ++++----
.../apache/solr/crossdc/DeleteByQueryToIdTest.java | 11 +-
.../solr/crossdc/RetryQueueIntegrationTest.java | 11 +-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 12 +-
.../solr/crossdc/SolrAndKafkaReindexTest.java | 16 +-
.../solr/crossdc/ZkConfigIntegrationTest.java | 15 +-
crossdc-producer/src/test/resources/log4j2.xml | 12 +-
13 files changed, 406 insertions(+), 359 deletions(-)
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
new file mode 100644
index 0000000..256b6e0
--- /dev/null
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
@@ -0,0 +1,67 @@
+package org.apache.solr.crossdc.common;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class ConfigProperty {
+
+ private final String key;
+ private final String defaultValue;
+
+ private boolean required = false;
+
+ public ConfigProperty(String key, String defaultValue, boolean required) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+ this.required = required;
+ }
+
+ public ConfigProperty(String key, String defaultValue) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+ }
+
+ public ConfigProperty(String key) {
+ this.key = key;
+ this.defaultValue = null;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public boolean isRequired() {
+ return required;
+ }
+
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ public String getValue(Map properties) {
+ String val = (String) properties.get(key);
+ if (val == null) {
+ return defaultValue;
+ }
+ return val;
+ }
+
+ public Integer getValueAsInt(Map properties) {
+ String value = (String) properties.get(key);
+ if (value != null) {
+ return Integer.parseInt(value);
+ }
+ if (defaultValue == null) {
+ return null;
+ }
+ return Integer.parseInt(defaultValue);
+ }
+
+ public Boolean getValueAsBoolean(Map properties) {
+ String value = (String) properties.get(key);
+ if (value != null) {
+ return Boolean.parseBoolean(value);
+ }
+ return Boolean.parseBoolean(defaultValue);
+ }
+}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 5b67f91..b34ae5b 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -19,6 +19,4 @@ package org.apache.solr.crossdc.common;
public abstract class CrossDcConf {
public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
-
- public abstract String getClusterName();
}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index c1d6149..85738b9 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -16,131 +16,182 @@
*/
package org.apache.solr.crossdc.common;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
+import java.util.*;
+
public class KafkaCrossDcConf extends CrossDcConf {
- public static final String DEFAULT_BATCH_SIZE_BYTES = "512000";
- public static final String DEFAULT_BUFFER_MEMORY_BYTES = "268435456";
- public static final String DEFAULT_LINGER_MS = "30";
- public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
-
- private final String topicName;
-
- private final String groupId;
- private final boolean enableDataEncryption;
- private final String bootstrapServers;
- private long slowSubmitThresholdInMillis = 1000;
- private int numOfRetries = 5;
- private final String solrZkConnectString;
-
- private final int maxPollRecords;
-
- private final int batchSizeBytes;
- private final int bufferMemoryBytes;
- private final int lingerMs;
- private final int requestTimeout;
-
- private final int fetchMinBytes;
-
- private final int fetchMaxWaitMS;
-
- public KafkaCrossDcConf(String bootstrapServers, String topicName, String
groupId, int maxPollRecords, int batchSizeBytes, int bufferMemoryBytes, int
lingerMs, int requestTimeout,
- int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption,
String solrZkConnectString) {
- this.bootstrapServers = bootstrapServers;
- this.topicName = topicName;
- this.enableDataEncryption = enableDataEncryption;
- this.solrZkConnectString = solrZkConnectString;
- this.groupId = groupId;
- this.maxPollRecords = maxPollRecords;
- this.batchSizeBytes = batchSizeBytes;
- this.bufferMemoryBytes = bufferMemoryBytes;
- this.lingerMs = lingerMs;
- this.requestTimeout = requestTimeout;
- this.fetchMinBytes = fetchMinBytes;
- this.fetchMaxWaitMS = fetchMaxWaitMS;
- }
- public String getTopicName() {
- return topicName;
- }
+ public static final String DEFAULT_BATCH_SIZE_BYTES = "2097152";
+ public static final String DEFAULT_BUFFER_MEMORY_BYTES = "536870912";
+ public static final String DEFAULT_LINGER_MS = "30";
+ public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
+ public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880";
+ public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none";
+ public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000";
+ public static final String DEFAULT_NUM_RETRIES = null; // by default, we
control retries with DELIVERY_TIMEOUT_MS_DOC
+ private static final String DEFAULT_RETRY_BACKOFF_MS = "500";
- public boolean getEnableDataEncryption() { return enableDataEncryption; }
+ private static final String DEFAULT_DELIVERY_TIMEOUT_MS = "120000";
- public long getSlowSubmitThresholdInMillis() {
- return slowSubmitThresholdInMillis;
- }
+ public static final String DEFAULT_MAX_POLL_RECORDS = "500"; // same default
as Kafka
- public void setSlowSubmitThresholdInMillis(long
slowSubmitThresholdInMillis) {
- this.slowSubmitThresholdInMillis = slowSubmitThresholdInMillis;
- }
+ private static final String DEFAULT_FETCH_MIN_BYTES = "512000";
+ private static final String DEFAULT_FETCH_MAX_WAIT_MS = "1000"; // Kafka
default is 500
- public int getNumOfRetries() {
- return numOfRetries;
- }
+ public static final String DEFAULT_FETCH_MAX_BYTES = "100663296";
- public String getSolrZkConnectString() {
- return solrZkConnectString;
- }
+ public static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "33554432";
- @Override
- public String getClusterName() {
- return null;
- }
+ public static final String DEFAULT_PORT = "8090";
- public String getGroupId() {
- return groupId;
- }
+ private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
- public int getMaxPollRecords() {
- return maxPollRecords;
- }
- public String getBootStrapServers() {
- return bootstrapServers;
- }
+ public static final String TOPIC_NAME = "topicName";
- public int getBatchSizeBytes() {
- return batchSizeBytes;
- }
+ public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
- public int getBufferMemoryBytes() {
- return bufferMemoryBytes;
- }
+ public static final String BATCH_SIZE_BYTES = "batchSizeBytes";
- public int getLingerMs() {
- return lingerMs;
- }
+ public static final String BUFFER_MEMORY_BYTES = "bufferMemoryBytes";
+
+ public static final String LINGER_MS = "lingerMs";
+
+ public static final String REQUEST_TIMEOUT_MS = "requestTimeoutMS";
+
+ public static final String MAX_REQUEST_SIZE_BYTES = "maxRequestSizeBytes";
+
+ public static final String ENABLE_DATA_COMPRESSION = "enableDataCompression";
+
+ public static final String SLOW_SUBMIT_THRESHOLD_MS =
"slowSubmitThresholdMs";
+
+ public static final String NUM_RETRIES = "numRetries";
+
+ public static final String RETRY_BACKOFF_MS = "retryBackoffMs";
+
+ public static final String DELIVERY_TIMEOUT_MS = "retryBackoffMs";
+
+ public static final String FETCH_MIN_BYTES = "fetchMinBytes";
+
+ public static final String FETCH_MAX_WAIT_MS = "fetchMaxWaitMS";
+
+ public static final String MAX_POLL_RECORDS = "maxPollRecords";
+
+ public static final String FETCH_MAX_BYTES = "fetchMaxBytes";
- public int getRequestTimeout() {
- return requestTimeout;
+ public static final String MAX_PARTITION_FETCH_BYTES =
"maxPartitionFetchBytes";
+
+ public static final String ZK_CONNECT_STRING = "zkConnectString";
+
+
+
+
+ public static final List<ConfigProperty> CONFIG_PROPERTIES;
+ private static final HashMap<String, ConfigProperty> CONFIG_PROPERTIES_MAP;
+
+ public static final String PORT = "port";
+
+ public static final String GROUP_ID = "groupId";
+
+
+
+ static {
+ CONFIG_PROPERTIES =
+ List.of(new ConfigProperty(TOPIC_NAME), new
ConfigProperty(BOOTSTRAP_SERVERS),
+ new ConfigProperty(BATCH_SIZE_BYTES, DEFAULT_BATCH_SIZE_BYTES),
+ new ConfigProperty(BUFFER_MEMORY_BYTES,
DEFAULT_BUFFER_MEMORY_BYTES),
+ new ConfigProperty(LINGER_MS, DEFAULT_LINGER_MS),
+ new ConfigProperty(REQUEST_TIMEOUT_MS, DEFAULT_REQUEST_TIMEOUT),
+ new ConfigProperty(MAX_REQUEST_SIZE_BYTES,
DEFAULT_MAX_REQUEST_SIZE),
+ new ConfigProperty(ENABLE_DATA_COMPRESSION,
DEFAULT_ENABLE_DATA_COMPRESSION),
+ new ConfigProperty(SLOW_SUBMIT_THRESHOLD_MS,
DEFAULT_SLOW_SEND_THRESHOLD),
+ new ConfigProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES),
+ new ConfigProperty(RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MS),
+ new ConfigProperty(DELIVERY_TIMEOUT_MS,
DEFAULT_DELIVERY_TIMEOUT_MS),
+
+ // Consumer only zkConnectString
+ new ConfigProperty(ZK_CONNECT_STRING, null),
+ new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
+ new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
+ new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
+
+ new ConfigProperty(MAX_PARTITION_FETCH_BYTES,
DEFAULT_MAX_PARTITION_FETCH_BYTES),
+ new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
+ new ConfigProperty(PORT, DEFAULT_PORT),
+ new ConfigProperty(GROUP_ID, DEFAULT_GROUP_ID)
+ );
+
+
+
+ CONFIG_PROPERTIES_MAP = new HashMap<String,
ConfigProperty>(CONFIG_PROPERTIES.size());
+ for (ConfigProperty prop : CONFIG_PROPERTIES) {
+ CONFIG_PROPERTIES_MAP.put(prop.getKey(), prop);
}
+ }
+
+ private final Map<String, String> properties;
+
+ public KafkaCrossDcConf(Map<String, String> properties) {
+ List<String> nullValueKeys = new ArrayList<String>();
+ properties.forEach((k, v) -> {
+ if (v == null) {
+ nullValueKeys.add(k);
+ }
+ });
+ nullValueKeys.forEach(properties::remove);
+ this.properties = properties;
+ }
+
+ public String get(String property) {
+ return CONFIG_PROPERTIES_MAP.get(property).getValue(properties);
+ }
- public int getFetchMinBytes() {
- return fetchMinBytes;
+ public Integer getInt(String property) {
+ ConfigProperty prop = CONFIG_PROPERTIES_MAP.get(property);
+ if (prop == null) {
+ throw new IllegalArgumentException("Property not found key=" + property);
}
+ return prop.getValueAsInt(properties);
+ }
- public int getFetchMaxWaitMS() {
- return fetchMaxWaitMS;
+ public Boolean getBool(String property) {
+ ConfigProperty prop = CONFIG_PROPERTIES_MAP.get(property);
+ if (prop == null) {
+ throw new IllegalArgumentException("Property not found key=" + property);
}
+ return prop.getValueAsBoolean(properties);
+ }
+
+ public Properties getAdditionalProperties() {
+ Properties additional = new Properties();
+ additional.putAll(properties);
+ for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
+ additional.remove(configProperty.getKey());
+ }
+ Map<String, Object> integerProperties = new HashMap<>();
+ additional.forEach((k, v) -> {
+ try {
+ int intVal = Integer.parseInt((String) v);
+ integerProperties.put(k.toString(), intVal);
+ } catch (NumberFormatException ignored) {
+
+ }
+ });
+ additional.putAll(integerProperties);
+ return additional;
+ }
- @Override
- public String toString() {
- return String.format("KafkaCrossDcConf{" +
- "topicName='%s', " +
- "groupId='%s', " +
- "enableDataEncryption='%b', " +
- "bootstrapServers='%s', " +
- "slowSubmitThresholdInMillis='%d', " +
- "numOfRetries='%d', " +
- "batchSizeBytes='%d', " +
- "bufferMemoryBytes='%d', " +
- "lingerMs='%d', " +
- "requestTimeout='%d', " +
- "fetchMinBytes='%d', " +
- "fetchMaxWaitMS='%d', " +
- "solrZkConnectString='%s'}",
- topicName, groupId, enableDataEncryption, bootstrapServers,
- slowSubmitThresholdInMillis, numOfRetries, batchSizeBytes,
- bufferMemoryBytes, lingerMs, requestTimeout, fetchMinBytes,
fetchMaxWaitMS, solrZkConnectString);
+ @Override public String toString() {
+ StringBuilder sb = new StringBuilder(128);
+ for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
+ sb.append(configProperty.getKey()).append("=")
+ .append(properties.get(configProperty.getKey())).append(",");
}
+ sb.setLength(sb.length() - 1);
+
+ return "KafkaCrossDcConf{" + sb.toString() + "}";
+ }
+
}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 787e6af..5afef79 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -30,12 +30,13 @@ import java.lang.invoke.MethodHandles;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
+import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS;
+
public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private long lastSuccessfulEnqueueNanos;
- private KafkaCrossDcConf conf;
+ private final KafkaCrossDcConf conf;
private final Producer<String, MirroredSolrRequest> producer;
public KafkaMirroringSink(final KafkaCrossDcConf conf) {
@@ -56,27 +57,25 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
// Create Producer record
try {
- producer.send(new ProducerRecord(conf.getTopicName(), request),
(metadata, exception) -> {
- if (log.isDebugEnabled()) {
- log.debug("Producer finished sending metadata={},
exception={}", metadata,
- exception);
+ producer.send(new
ProducerRecord<>(conf.get(KafkaCrossDcConf.TOPIC_NAME), request), (metadata,
exception) -> {
+ if (exception != null) {
+ log.error("Failed adding update to CrossDC queue!
request=" + request.getSolrRequest(), exception);
}
});
- lastSuccessfulEnqueueNanos = System.nanoTime();
+ long lastSuccessfulEnqueueNanos = System.nanoTime();
// Record time since last successful enqueue as 0
long elapsedTimeMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
// Update elapsed time
- if (elapsedTimeMillis > conf.getSlowSubmitThresholdInMillis()) {
+ if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
slowSubmitAction(request, elapsedTimeMillis);
}
} catch (Exception e) {
// We are intentionally catching all exceptions, the expected
exception form this function is {@link MirroringException}
-
- String message = String.format("Unable to enqueue request %s, # of
attempts %s", request, conf.getNumOfRetries());
+ String message = "Unable to enqueue request " + request + ",
configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
+ " and configured max delivery timeout in ms is " +
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
log.error(message, e);
-
throw new MirroringException(message, e);
}
}
@@ -90,31 +89,39 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
*/
private Producer<String, MirroredSolrRequest> initProducer() {
// Initialize and return Kafka producer
- Properties props = new Properties();
+ Properties kafkaProducerProps = new Properties();
log.info("Creating Kafka producer! Configurations {} ",
conf.toString());
- props.put("bootstrap.servers", conf.getBootStrapServers());
+ kafkaProducerProps.put("bootstrap.servers",
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+
+ kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
+ if (retries != null) {
+ kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.parseInt(retries));
+ }
+ kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
+ kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
+ kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG,
conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
+ kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
+ kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.LINGER_MS));
+ kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time
that causes consumer to be kicked out
+ kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION));
- props.put("acks", "all");
- props.put("retries", 3);
- props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getBatchSizeBytes());
- props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
conf.getBufferMemoryBytes());
- props.put(ProducerConfig.LINGER_MS_CONFIG, conf.getLingerMs());
- props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getRequestTimeout()); // should be less than time that causes consumer to
be kicked out
+ kafkaProducerProps.put("key.serializer",
StringSerializer.class.getName());
+ kafkaProducerProps.put("value.serializer",
MirroredSolrRequestSerializer.class.getName());
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer",
MirroredSolrRequestSerializer.class.getName());
+ kafkaProducerProps.putAll(conf.getAdditionalProperties());
if (log.isDebugEnabled()) {
- log.debug("Kafka Producer props={}", props);
+ log.debug("Kafka Producer props={}", kafkaProducerProps);
}
ClassLoader originalContextClassLoader =
Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(null);
Producer<String, MirroredSolrRequest> producer;
try {
- producer = new KafkaProducer(props);
+ producer = new KafkaProducer<>(kafkaProducerProps);
} finally {
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
}
@@ -123,7 +130,7 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
private void slowSubmitAction(Object request, long elapsedTimeMillis) {
log.warn("Enqueuing the request to Kafka took more than {} millis.
enqueueElapsedTime={}",
- conf.getSlowSubmitThresholdInMillis(),
+ conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS),
elapsedTimeMillis);
}
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index a19da19..f77a5bb 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.consumer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.crossdc.common.ConfigProperty;
import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
@@ -29,6 +30,8 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -37,17 +40,9 @@ import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
// Cross-DC Consumer main class
public class Consumer {
- public static final String DEFAULT_PORT = "8090";
- public static final String TOPIC_NAME = "topicName";
- public static final String GROUP_ID = "groupId";
- public static final String PORT = "port";
- public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
- private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
- private static final String MAX_POLL_RECORDS = "maxPollRecords";
- public static final String DEFAULT_MAX_POLL_RECORDS = "500";
-
- private static final String DEFAULT_FETCH_MIN_BYTES = "512000";
- private static final String DEFAULT_FETCH_MAX_WAIT_MS = "1000";
+
+
+
private static boolean enabled = true;
@@ -61,57 +56,19 @@ public class Consumer {
private Server server;
CrossDcConsumer crossDcConsumer;
- public void start(String bootstrapServers, String zkConnectString, String
topicName, String groupId, int maxPollRecords, int batchSizeBytes, int
bufferMemoryBytes, int lingerMs, int requestTimeout,
- int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption,
int port) {
- if (bootstrapServers == null) {
- throw new IllegalArgumentException("bootstrapServers config was
not passed at startup");
- }
- if (zkConnectString == null) {
- throw new IllegalArgumentException("zkConnectString config was not
passed at startup");
- }
- if (topicName == null) {
- throw new IllegalArgumentException("topicName config was not
passed at startup");
- }
-
- if (maxPollRecords == -1) {
- maxPollRecords = Integer.parseInt(DEFAULT_MAX_POLL_RECORDS);
- }
-
- if (batchSizeBytes == -1) {
- batchSizeBytes = Integer.parseInt(DEFAULT_BATCH_SIZE_BYTES);
- }
-
- if (bufferMemoryBytes == -1) {
- bufferMemoryBytes = Integer.parseInt(DEFAULT_BUFFER_MEMORY_BYTES);
- }
-
- if (lingerMs == -1) {
- lingerMs = Integer.parseInt(DEFAULT_LINGER_MS);
- }
-
- if (requestTimeout == -1) {
- requestTimeout = Integer.parseInt(DEFAULT_REQUEST_TIMEOUT);
- }
-
- if (fetchMinBytes == -1) {
- fetchMinBytes = Integer.parseInt(DEFAULT_FETCH_MIN_BYTES);
- }
+ public void start(Map<String, String> properties) {
- if (fetchMaxWaitMS == -1) {
- fetchMaxWaitMS = Integer.parseInt(DEFAULT_FETCH_MAX_WAIT_MS);
- }
//server = new Server();
//ServerConnector connector = new ServerConnector(server);
//connector.setPort(port);
//server.setConnectors(new Connector[] {connector})
-
- crossDcConsumer = getCrossDcConsumer(bootstrapServers,
zkConnectString, topicName, groupId, maxPollRecords, batchSizeBytes,
bufferMemoryBytes, lingerMs,
- requestTimeout, fetchMinBytes, fetchMaxWaitMS,
enableDataEncryption);
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+ crossDcConsumer = getCrossDcConsumer(conf);
// Start consumer thread
- log.info("Starting CrossDC Consumer bootstrapServers={},
zkConnectString={}, topicName={}, groupId={}, enableDataEncryption={}",
bootstrapServers, zkConnectString, topicName, groupId, enableDataEncryption);
+ log.info("Starting CrossDC Consumer {}", conf);
consumerThreadExecutor = Executors.newSingleThreadExecutor();
consumerThreadExecutor.submit(crossDcConsumer);
@@ -121,58 +78,41 @@ public class Consumer {
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
- private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String
zkConnectString, String topicName, String groupId, int maxPollRecords,
- int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int
requestTimeout, int fetchMinBytes, int fetchMaxWaitMS, boolean
enableDataEncryption) {
-
- KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers,
topicName, groupId, maxPollRecords, batchSizeBytes, bufferMemoryBytes, lingerMs,
- requestTimeout, fetchMinBytes, fetchMaxWaitMS,
enableDataEncryption, zkConnectString);
+ private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
return new KafkaCrossDcConsumer(conf);
}
public static void main(String[] args) {
- String zkConnectString = System.getProperty("zkConnectString");
- if (zkConnectString == null || zkConnectString.isBlank()) {
- throw new IllegalArgumentException("zkConnectString not specified
for producer");
+ Map<String,String> properties = new HashMap<>();
+
+ for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+ properties.put(configKey.getKey(),
System.getProperty(configKey.getKey()));
}
- String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS);
- // boolean enableDataEncryption =
Boolean.getBoolean("enableDataEncryption");
- String topicName = System.getProperty(TOPIC_NAME);
- String port = System.getProperty(PORT);
- String groupId = System.getProperty(GROUP_ID, "");
- String maxPollRecords = System.getProperty("maxPollRecords");
- String batchSizeBytes = System.getProperty("batchSizeBytes");
- String bufferMemoryBytes = System.getProperty("bufferMemoryBytes");
- String lingerMs = System.getProperty("lingerMs");
- String requestTimeout = System.getProperty("requestTimeout");
- String fetchMinBytes = System.getProperty("fetchMinBytes");
- String fetchMaxWaitMS = System.getProperty("fetchMaxWaitMS");
+ String zkConnectString =
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
+ if (zkConnectString == null || zkConnectString.isBlank()) {
+ throw new IllegalArgumentException("zkConnectString not specified
for Consumer");
+ }
try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
try {
- if ((topicName == null || topicName.isBlank()) || (groupId ==
null || groupId.isBlank())
- || (bootstrapServers == null ||
bootstrapServers.isBlank()) || (port == null || port.isBlank()) ||
(maxPollRecords == null || maxPollRecords.isBlank())
- || (batchSizeBytes == null || batchSizeBytes.isBlank()) ||
(bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) || (lingerMs == null
|| lingerMs.isBlank())
- || (requestTimeout == null || requestTimeout.isBlank())
|| (fetchMinBytes == null || fetchMinBytes.isBlank()) || (fetchMaxWaitMS ==
null || fetchMaxWaitMS.isBlank())
- &&
client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
+ if
(client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
byte[] data =
client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
- Properties props = new Properties();
- props.load(new ByteArrayInputStream(data));
-
- topicName = getConfig(TOPIC_NAME, topicName, props);
- bootstrapServers = getConfig(BOOTSTRAP_SERVERS,
bootstrapServers, props);
- port = getConfig(PORT, port, props);
- groupId = getConfig(GROUP_ID, groupId, props);
- maxPollRecords = getConfig(MAX_POLL_RECORDS,
maxPollRecords, props);
- batchSizeBytes = getConfig("batchSizeBytes",
batchSizeBytes, props);
- bufferMemoryBytes = getConfig("bufferMemoryBytes",
bufferMemoryBytes, props);
- lingerMs = getConfig("lingerMs", lingerMs, props);
- requestTimeout = getConfig("requestTimeout",
requestTimeout, props);
- fetchMinBytes = getConfig("fetchMinBytes", fetchMinBytes,
props);
- fetchMaxWaitMS = getConfig("fetchMaxWaitMS",
fetchMaxWaitMS, props);
-
+ Properties zkProps = new Properties();
+ zkProps.load(new ByteArrayInputStream(data));
+ Properties zkPropsUnproccessed = new Properties(zkProps);
+
+ for (ConfigProperty configKey :
KafkaCrossDcConf.CONFIG_PROPERTIES) {
+ if (properties.get(configKey.getKey()) == null ||
properties.get(configKey.getKey()).isBlank()) {
+ properties.put(configKey.getKey(), (String)
zkProps.get(configKey.getKey()));
+ zkPropsUnproccessed.remove(configKey.getKey());
+ }
+ }
+ zkPropsUnproccessed.forEach((k, v) -> {
+ properties.put((String) k, (String) v);
+ });
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -182,54 +122,18 @@ public class Consumer {
}
}
- if (port == null) {
- port = DEFAULT_PORT;
- }
-
+ String bootstrapServers =
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
if (bootstrapServers == null || bootstrapServers.isBlank()) {
- throw new IllegalArgumentException("boostrapServers not specified
for producer");
- }
- if (topicName == null || topicName.isBlank()) {
- throw new IllegalArgumentException("topicName not specified for
producer");
- }
-
- if (groupId.isBlank()) {
- groupId = DEFAULT_GROUP_ID;
+ throw new IllegalArgumentException("bootstrapServers not specified
for Consumer");
}
- if (maxPollRecords == null || maxPollRecords.isBlank()) {
- maxPollRecords = DEFAULT_MAX_POLL_RECORDS;
- }
- if (batchSizeBytes == null || batchSizeBytes.isBlank()) {
- batchSizeBytes = DEFAULT_BATCH_SIZE_BYTES;
- }
- if (bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) {
- bufferMemoryBytes = DEFAULT_BUFFER_MEMORY_BYTES;
- }
- if (lingerMs == null || lingerMs.isBlank()) {
- lingerMs = DEFAULT_LINGER_MS;
- }
- if (requestTimeout == null || requestTimeout.isBlank()) {
- requestTimeout = DEFAULT_REQUEST_TIMEOUT;
- }
- if (fetchMinBytes == null || fetchMinBytes.isBlank()) {
- fetchMinBytes = DEFAULT_FETCH_MIN_BYTES;
- }
- if (fetchMaxWaitMS == null || fetchMaxWaitMS.isBlank()) {
- fetchMaxWaitMS = DEFAULT_FETCH_MAX_WAIT_MS;
+ String topicName = properties.get(TOPIC_NAME);
+ if (topicName == null || topicName.isBlank()) {
+ throw new IllegalArgumentException("topicName not specified for
Consumer");
}
Consumer consumer = new Consumer();
- consumer.start(bootstrapServers, zkConnectString, topicName, groupId,
Integer.parseInt(maxPollRecords),
- Integer.parseInt(batchSizeBytes),
Integer.parseInt(bufferMemoryBytes), Integer.parseInt(lingerMs),
- Integer.parseInt(requestTimeout), Integer.parseInt(fetchMinBytes),
Integer.parseInt(fetchMaxWaitMS), false, Integer.parseInt(port));
- }
-
- private static String getConfig(String configName, String configValue,
Properties props) {
- if (configValue == null || configValue.isBlank()) {
- configValue = props.getProperty(configName);
- }
- return configValue;
+ consumer.start(properties);
}
public void shutdown() {
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 9d7fcc9..9f95a90 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -8,6 +8,7 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.crossdc.common.*;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
@@ -33,7 +34,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final KafkaConsumer<String, MirroredSolrRequest> consumer;
private final KafkaMirroringSink kafkaMirroringSink;
- private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
+ private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
private final String topicName;
SolrMessageProcessor messageProcessor;
@@ -43,25 +44,30 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
* @param conf The Kafka consumer configuration
*/
public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
- this.topicName = conf.getTopicName();
+ this.topicName = conf.get(KafkaCrossDcConf.TOPIC_NAME);
- final Properties kafkaConsumerProp = new Properties();
+ final Properties kafkaConsumerProps = new Properties();
- kafkaConsumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.getBootStrapServers());
+ kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
- kafkaConsumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getGroupId());
+ kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
conf.get(KafkaCrossDcConf.GROUP_ID));
- kafkaConsumerProp.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getMaxPollRecords());
+ kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
- kafkaConsumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- kafkaConsumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- kafkaConsumerProp.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getFetchMinBytes());
- kafkaConsumerProp.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getFetchMaxWaitMS());
+ kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+ kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+
+ kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+ kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+
+ kafkaConsumerProps.putAll(conf.getAdditionalProperties());
solrClient =
- new
CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()),
+ new
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
Optional.empty()).build();
messageProcessor = new SolrMessageProcessor(solrClient, new
ResubmitBackoffPolicy() {
@@ -70,8 +76,8 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
}
});
- log.info("Creating Kafka consumer with configuration {}",
kafkaConsumerProp);
- consumer = createConsumer(kafkaConsumerProp);
+ log.info("Creating Kafka consumer with configuration {}",
kafkaConsumerProps);
+ consumer = createConsumer(kafkaConsumerProps);
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 562b6b0..57a18ad 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -23,6 +23,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.ConfigProperty;
import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -30,13 +31,14 @@ import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.units.qual.C;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.Properties;
+import java.util.*;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
import static
org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@@ -68,16 +70,22 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
/** This is instantiated in inform(SolrCore) and then shared by all
processor instances - visible for testing */
volatile KafkaRequestMirroringHandler mirroringHandler;
- private String topicName;
- private String bootstrapServers;
-
- private Integer batchSizeBytes;
- private Integer bufferMemoryBytes;
- private Integer lingerMs;
- private Integer requestTimeout;
+// private String topicName;
+// private String bootstrapServers;
+//
+// private Integer batchSizeBytes;
+// private Integer bufferMemoryBytes;
+// private Integer lingerMs;
+// private Integer requestTimeout;
+//
+// private Integer maxRequestSize;
+//
+// private String enableDataCompression;
private boolean enabled = true;
+ private final Map<String,String> properties = new HashMap<>();
+
@Override
public void init(final NamedList args) {
super.init(args);
@@ -87,13 +95,9 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
this.enabled = false;
}
- topicName = args._getStr("topicName", null);
- bootstrapServers = args._getStr("bootstrapServers", null);
-
- batchSizeBytes = (Integer) args.get("batchSizeBytes");
- bufferMemoryBytes= (Integer) args.get("bufferMemoryBytes");;
- lingerMs = (Integer) args.get("lingerMs");;
- requestTimeout = (Integer) args.get("requestTimeout");;
+ for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+ properties.put(configKey.getKey(),
args._getStr(configKey.getKey(), null));
+ }
}
private class Closer {
@@ -107,7 +111,7 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
try {
this.sink.close();
} catch (IOException e) {
- log.error("Exception closing sink", sink);
+ log.error("Exception closing sink", e);
}
}
@@ -120,40 +124,30 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
if (!enabled) {
return;
}
-
+ Properties zkProps = null;
try {
- if (((topicName == null || topicName.isBlank()) ||
(bootstrapServers == null || bootstrapServers.isBlank()
- || batchSizeBytes == null || bufferMemoryBytes == null
- || lingerMs == null || requestTimeout == null)) &&
core.getCoreContainer().getZkController()
+ if (core.getCoreContainer().getZkController()
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
byte[] data =
core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
if (data == null) {
- log.error("crossdc.properties file in Zookeeper has no
data");
- throw new
SolrException(SolrException.ErrorCode.SERVER_ERROR, "crossdc.properties file in
Zookeeper has no data");
+ log.error(KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in
Zookeeper has no data");
+ throw new
SolrException(SolrException.ErrorCode.SERVER_ERROR,
KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in Zookeeper has no data");
}
- Properties props = new Properties();
- props.load(new ByteArrayInputStream(data));
+ zkProps = new Properties();
+ zkProps.load(new ByteArrayInputStream(data));
+ Properties zkPropsUnproccessed = new Properties(zkProps);
- if (topicName == null || topicName.isBlank()) {
- topicName = props.getProperty("topicName");
- }
- if (bootstrapServers == null || bootstrapServers.isBlank()) {
- bootstrapServers = props.getProperty("bootstrapServers");
- }
- if (batchSizeBytes == null) {
- batchSizeBytes = getIntegerPropValue("batchSizeBytes",
props);
- }
- if (bufferMemoryBytes == null) {
- bufferMemoryBytes =
getIntegerPropValue("bufferMemoryBytes", props);
- }
- if (lingerMs == null) {
- lingerMs = getIntegerPropValue("lingerMs", props);
- }
- if (requestTimeout == null) {
- requestTimeout = getIntegerPropValue("requestTimeout",
props);
+ for (ConfigProperty configKey :
KafkaCrossDcConf.CONFIG_PROPERTIES) {
+ if (properties.get(configKey.getKey()) == null ||
properties.get(configKey.getKey()).isBlank()) {
+ properties.put(configKey.getKey(), (String)
zkProps.get(configKey.getKey()));
+ zkPropsUnproccessed.remove(configKey.getKey());
+ }
}
+ zkPropsUnproccessed.forEach((k, v) -> {
+ properties.put((String) k, (String) v);
+ });
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -164,35 +158,22 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Exception looking for CrossDC configuration in Zookeeper", e);
}
- if (bootstrapServers == null || bootstrapServers.isBlank()) {
- log.error("boostrapServers not specified for producer in CrossDC
configuration");
+ if (properties.get(BOOTSTRAP_SERVERS) == null ||
properties.get(BOOTSTRAP_SERVERS).isBlank()) {
+ log.error("boostrapServers not specified for producer in CrossDC
configuration props=" + properties + " zkProps=" + zkProps);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"boostrapServers not specified for producer");
}
- if (topicName == null || topicName.isBlank()) {
- log.error("topicName not specified for producer in CrossDC
configuration");
+ if (properties.get(TOPIC_NAME) == null ||
properties.get(TOPIC_NAME).isBlank()) {
+ log.error("topicName not specified for producer in CrossDC
configuration props=" + properties + " zkProps=" + zkProps);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"topicName not specified for producer");
}
- if (batchSizeBytes == null) {
- batchSizeBytes = Integer.valueOf(DEFAULT_BATCH_SIZE_BYTES);
- }
- if (bufferMemoryBytes == null) {
- bufferMemoryBytes = Integer.valueOf(DEFAULT_BUFFER_MEMORY_BYTES);
- }
- if (lingerMs == null) {
- lingerMs = Integer.valueOf(DEFAULT_LINGER_MS);
- }
- if (requestTimeout == null) {
- requestTimeout = Integer.valueOf(DEFAULT_REQUEST_TIMEOUT);
- }
-
- log.info("bootstrapServers={} topicName={}", bootstrapServers,
topicName);
+ log.info("bootstrapServers={} topicName={}",
properties.get(BOOTSTRAP_SERVERS) , properties.get(TOPIC_NAME));
// load the request mirroring sink class and instantiate.
// mirroringHandler =
core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(),
KafkaRequestMirroringHandler.class);
- KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers,
topicName, "", -1, batchSizeBytes, bufferMemoryBytes, lingerMs,
requestTimeout,-1, -1,false, null);
+ KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
KafkaMirroringSink sink = new KafkaMirroringSink(conf);
Closer closer = new Closer(sink);
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index 7817efd..a2928ca 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -14,6 +14,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
import org.junit.After;
import org.junit.AfterClass;
@@ -24,6 +25,8 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
@ThreadLeakFilters(defaultFilters = true, filters = {
SolrIgnoredThreadsFilter.class,
@@ -104,8 +107,12 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers,
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,-1,-1,-1,
- -1,-1, -1, false, 0);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+ properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ consumer.start(properties);
}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 1a12a87..b15cc25 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -17,6 +17,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.cloud.ZkTestServer;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
import org.junit.After;
import org.junit.AfterClass;
@@ -27,6 +28,8 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -106,8 +109,12 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers,
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,-1,-1,
- -1,-1,-1, -1, false, 0);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+ properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ consumer.start(properties);
}
private static MiniSolrCloudCluster startCluster(MiniSolrCloudCluster
solrCluster, ZkTestServer zkTestServer, Path baseDir) throws Exception {
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 268be5c..4b45506 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.consumer.Consumer;
@@ -29,9 +30,12 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.sys.Prop;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import static org.mockito.Mockito.spy;
@@ -99,8 +103,12 @@ import static org.mockito.Mockito.spy;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers,
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,
- -1,-1,-1,-1, -1,false, 0);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+ properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ consumer.start(properties);
}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index 9ac0071..c0037cd 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -15,6 +15,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.consumer.Consumer;
import org.junit.After;
import org.junit.AfterClass;
@@ -24,9 +25,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
@ThreadLeakFilters(defaultFilters = true, filters = {
SolrIgnoredThreadsFilter.class,
QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
@@ -91,8 +90,13 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers,
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
- -1,-1, -1, -1, false, 0);
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+ properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ consumer.start(properties);
}
@@ -183,7 +187,7 @@ import java.util.Properties;
addDocs(client, "third");
foundUpdates = false;
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 1000; i++) {
solrCluster2.getSolrClient().commit(COLLECTION);
solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
results = solrCluster2.getSolrClient().query(COLLECTION, new
SolrQuery("*:*"));
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index 94ad5c5..b52a9c5 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -19,6 +19,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
import org.apache.solr.crossdc.consumer.Consumer;
@@ -31,6 +32,8 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
@ThreadLeakFilters(defaultFilters = true, filters = {
SolrIgnoredThreadsFilter.class,
@@ -78,8 +81,8 @@ import java.util.Properties;
solrCluster1 = new SolrCloudTestCase.Builder(1,
createTempDir()).addConfig("conf",
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
- props.setProperty("topicName", TOPIC);
- props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+ props.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS,
kafkaCluster.bootstrapServers());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
props.store(baos, "");
@@ -108,8 +111,12 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- consumer.start(bootstrapServers,
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
- -1,-1,-1, -1, false, 0);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+ properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
+ properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+ properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ consumer.start(properties);
}
diff --git a/crossdc-producer/src/test/resources/log4j2.xml
b/crossdc-producer/src/test/resources/log4j2.xml
index 3fd55ed..98c24fc 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -41,7 +41,7 @@
<OnStartupTriggeringPolicy/>
<SizeBasedTriggeringPolicy size="128 MB"/>
</Policies>
- <DefaultRolloverStrategy max="10"/>
+ <DefaultRolloverStrategy max="30"/>
</RollingRandomAccessFile>
</Appenders>
<Loggers>
@@ -59,11 +59,11 @@
<Logger name="org.apache.solr.hadoop" level="INFO"/>
<Logger name="org.eclipse.jetty" level="INFO"/>
- <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer"
level="TRACE"/>
- <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor"
level="TRACE"/>
- <Logger
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler"
level="TRACE"/>
- <Logger
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor"
level="TRACE"/>
- <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink"
level="TRACE"/>
+ <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer"
level="INFO"/>
+ <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor"
level="INFO"/>
+ <Logger
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler"
level="INFO"/>
+ <Logger
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor"
level="INFO"/>
+ <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink"
level="INFO"/>
<Root level="INFO">