This is an automated email from the ASF dual-hosted git repository.

anshum pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new ddfab4b  Configuration improvements, cleanup, minor fixes. (#35)
ddfab4b is described below

commit ddfab4b953897a94f946b558539057cc7e64b23c
Author: Mark Robert Miller <[email protected]>
AuthorDate: Tue Sep 13 14:14:13 2022 -0500

    Configuration improvements, cleanup, minor fixes. (#35)
---
 .../apache/solr/crossdc/common/ConfigProperty.java |  67 ++++++
 .../apache/solr/crossdc/common/CrossDcConf.java    |   2 -
 .../solr/crossdc/common/KafkaCrossDcConf.java      | 257 ++++++++++++---------
 .../solr/crossdc/common/KafkaMirroringSink.java    |  55 +++--
 .../org/apache/solr/crossdc/consumer/Consumer.java | 174 ++++----------
 .../crossdc/consumer/KafkaCrossDcConsumer.java     |  32 +--
 .../MirroringUpdateRequestProcessorFactory.java    | 101 ++++----
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java |  11 +-
 .../solr/crossdc/RetryQueueIntegrationTest.java    |  11 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |  12 +-
 .../solr/crossdc/SolrAndKafkaReindexTest.java      |  16 +-
 .../solr/crossdc/ZkConfigIntegrationTest.java      |  15 +-
 crossdc-producer/src/test/resources/log4j2.xml     |  12 +-
 13 files changed, 406 insertions(+), 359 deletions(-)

diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
new file mode 100644
index 0000000..256b6e0
--- /dev/null
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
@@ -0,0 +1,67 @@
+package org.apache.solr.crossdc.common;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class ConfigProperty {
+
+  private final String key;
+  private final String defaultValue;
+
+  private boolean required = false;
+
+  public ConfigProperty(String key, String defaultValue, boolean required) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+    this.required = required;
+  }
+
+  public ConfigProperty(String key, String defaultValue) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+  }
+
+  public ConfigProperty(String key) {
+    this.key = key;
+    this.defaultValue = null;
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public boolean isRequired() {
+    return required;
+  }
+
+  public String getDefaultValue() {
+    return defaultValue;
+  }
+
+  public String getValue(Map properties) {
+    String val = (String) properties.get(key);
+    if (val == null) {
+     return defaultValue;
+    }
+    return val;
+  }
+
+  public Integer getValueAsInt(Map properties) {
+    String value = (String) properties.get(key);
+    if (value != null) {
+      return Integer.parseInt(value);
+    }
+    if (defaultValue == null) {
+      return null;
+    }
+    return Integer.parseInt(defaultValue);
+  }
+
+  public Boolean getValueAsBoolean(Map properties) {
+    String value = (String) properties.get(key);
+    if (value != null) {
+      return Boolean.parseBoolean(value);
+    }
+    return Boolean.parseBoolean(defaultValue);
+  }
+}
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
index 5b67f91..b34ae5b 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/CrossDcConf.java
@@ -19,6 +19,4 @@ package org.apache.solr.crossdc.common;
 public abstract class CrossDcConf {
     public static final String CROSSDC_PROPERTIES = "/crossdc.properties";
     public static final String ZK_CROSSDC_PROPS_PATH = "zkCrossDcPropsPath";
-
-    public abstract String getClusterName();
 }
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index c1d6149..85738b9 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -16,131 +16,182 @@
  */
 package org.apache.solr.crossdc.common;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 
+import java.util.*;
+
 public class KafkaCrossDcConf extends CrossDcConf {
 
-    public static final String DEFAULT_BATCH_SIZE_BYTES = "512000";
-    public static final String DEFAULT_BUFFER_MEMORY_BYTES = "268435456";
-    public static final String DEFAULT_LINGER_MS = "30";
-    public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
-
-    private final String topicName;
-
-    private final String groupId;
-    private final boolean enableDataEncryption;
-    private final String bootstrapServers;
-    private long slowSubmitThresholdInMillis = 1000;
-    private int numOfRetries = 5;
-    private final String solrZkConnectString;
-
-    private final int maxPollRecords;
-
-    private final int batchSizeBytes;
-    private final int bufferMemoryBytes;
-    private final int lingerMs;
-    private final int requestTimeout;
-
-  private final int fetchMinBytes;
-
-  private final int fetchMaxWaitMS;
-
-  public KafkaCrossDcConf(String bootstrapServers, String topicName, String 
groupId, int maxPollRecords, int batchSizeBytes, int bufferMemoryBytes, int 
lingerMs, int requestTimeout,
-      int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption, 
String solrZkConnectString) {
-        this.bootstrapServers = bootstrapServers;
-        this.topicName = topicName;
-        this.enableDataEncryption = enableDataEncryption;
-        this.solrZkConnectString = solrZkConnectString;
-        this.groupId = groupId;
-        this.maxPollRecords = maxPollRecords;
-        this.batchSizeBytes = batchSizeBytes;
-        this.bufferMemoryBytes = bufferMemoryBytes;
-        this.lingerMs = lingerMs;
-        this.requestTimeout = requestTimeout;
-        this.fetchMinBytes = fetchMinBytes;
-        this.fetchMaxWaitMS = fetchMaxWaitMS;
-    }
-    public String getTopicName() {
-        return topicName;
-    }
+  public static final String DEFAULT_BATCH_SIZE_BYTES = "2097152";
+  public static final String DEFAULT_BUFFER_MEMORY_BYTES = "536870912";
+  public static final String DEFAULT_LINGER_MS = "30";
+  public static final String DEFAULT_REQUEST_TIMEOUT = "60000";
+  public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880";
+  public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none";
+  public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000";
+  public static final String DEFAULT_NUM_RETRIES = null; // by default, we 
control retries with DELIVERY_TIMEOUT_MS_DOC
+  private static final String DEFAULT_RETRY_BACKOFF_MS = "500";
 
-    public boolean getEnableDataEncryption() { return enableDataEncryption; }
+  private static final String DEFAULT_DELIVERY_TIMEOUT_MS = "120000";
 
-    public long getSlowSubmitThresholdInMillis() {
-        return slowSubmitThresholdInMillis;
-    }
+  public static final String DEFAULT_MAX_POLL_RECORDS = "500"; // same default 
as Kafka
 
-    public void setSlowSubmitThresholdInMillis(long 
slowSubmitThresholdInMillis) {
-        this.slowSubmitThresholdInMillis = slowSubmitThresholdInMillis;
-    }
+  private static final String DEFAULT_FETCH_MIN_BYTES = "512000";
+  private static final String DEFAULT_FETCH_MAX_WAIT_MS = "1000"; // Kafka 
default is 500
 
-    public int getNumOfRetries() {
-        return numOfRetries;
-    }
+  public static final String DEFAULT_FETCH_MAX_BYTES = "100663296";
 
-    public String getSolrZkConnectString() {
-        return solrZkConnectString;
-    }
+  public static final String DEFAULT_MAX_PARTITION_FETCH_BYTES = "33554432";
 
-    @Override
-    public String getClusterName() {
-        return null;
-    }
+  public static final String DEFAULT_PORT = "8090";
 
-    public String getGroupId() {
-        return groupId;
-  }
+  private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
 
-    public int getMaxPollRecords() {
-        return maxPollRecords;
-    }
 
-    public String getBootStrapServers() {
-        return bootstrapServers;
-    }
+  public static final String TOPIC_NAME = "topicName";
 
-    public int getBatchSizeBytes() {
-        return batchSizeBytes;
-    }
+  public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
 
-    public int getBufferMemoryBytes() {
-        return bufferMemoryBytes;
-    }
+  public static final String BATCH_SIZE_BYTES = "batchSizeBytes";
 
-    public int getLingerMs() {
-        return lingerMs;
-    }
+  public static final String BUFFER_MEMORY_BYTES = "bufferMemoryBytes";
+
+  public static final String LINGER_MS = "lingerMs";
+
+  public static final String REQUEST_TIMEOUT_MS = "requestTimeoutMS";
+
+  public static final String MAX_REQUEST_SIZE_BYTES = "maxRequestSizeBytes";
+
+  public static final String ENABLE_DATA_COMPRESSION = "enableDataCompression";
+
+  public static final String SLOW_SUBMIT_THRESHOLD_MS = 
"slowSubmitThresholdMs";
+
+  public static final String NUM_RETRIES = "numRetries";
+
+  public static final String RETRY_BACKOFF_MS = "retryBackoffMs";
+
+  public static final String DELIVERY_TIMEOUT_MS = "retryBackoffMs";
+
+  public static final String FETCH_MIN_BYTES = "fetchMinBytes";
+
+  public static final String FETCH_MAX_WAIT_MS = "fetchMaxWaitMS";
+
+  public static final String MAX_POLL_RECORDS = "maxPollRecords";
+
+  public static final String FETCH_MAX_BYTES = "fetchMaxBytes";
 
-    public int getRequestTimeout() {
-        return requestTimeout;
+  public static final String MAX_PARTITION_FETCH_BYTES = 
"maxPartitionFetchBytes";
+
+  public static final String ZK_CONNECT_STRING = "zkConnectString";
+
+
+
+
+  public static final List<ConfigProperty> CONFIG_PROPERTIES;
+  private static final HashMap<String, ConfigProperty> CONFIG_PROPERTIES_MAP;
+
+  public static final String PORT = "port";
+
+  public static final String GROUP_ID = "groupId";
+
+
+
+  static {
+    CONFIG_PROPERTIES =
+        List.of(new ConfigProperty(TOPIC_NAME), new 
ConfigProperty(BOOTSTRAP_SERVERS),
+            new ConfigProperty(BATCH_SIZE_BYTES, DEFAULT_BATCH_SIZE_BYTES),
+            new ConfigProperty(BUFFER_MEMORY_BYTES, 
DEFAULT_BUFFER_MEMORY_BYTES),
+            new ConfigProperty(LINGER_MS, DEFAULT_LINGER_MS),
+            new ConfigProperty(REQUEST_TIMEOUT_MS, DEFAULT_REQUEST_TIMEOUT),
+            new ConfigProperty(MAX_REQUEST_SIZE_BYTES, 
DEFAULT_MAX_REQUEST_SIZE),
+            new ConfigProperty(ENABLE_DATA_COMPRESSION, 
DEFAULT_ENABLE_DATA_COMPRESSION),
+            new ConfigProperty(SLOW_SUBMIT_THRESHOLD_MS, 
DEFAULT_SLOW_SEND_THRESHOLD),
+            new ConfigProperty(NUM_RETRIES, DEFAULT_NUM_RETRIES),
+            new ConfigProperty(RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MS),
+            new ConfigProperty(DELIVERY_TIMEOUT_MS, 
DEFAULT_DELIVERY_TIMEOUT_MS),
+
+            // Consumer only zkConnectString
+            new ConfigProperty(ZK_CONNECT_STRING, null),
+            new ConfigProperty(FETCH_MIN_BYTES, DEFAULT_FETCH_MIN_BYTES),
+            new ConfigProperty(FETCH_MAX_BYTES, DEFAULT_FETCH_MAX_BYTES),
+            new ConfigProperty(FETCH_MAX_WAIT_MS, DEFAULT_FETCH_MAX_WAIT_MS),
+
+            new ConfigProperty(MAX_PARTITION_FETCH_BYTES, 
DEFAULT_MAX_PARTITION_FETCH_BYTES),
+            new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
+            new ConfigProperty(PORT, DEFAULT_PORT),
+            new ConfigProperty(GROUP_ID, DEFAULT_GROUP_ID)
+            );
+
+
+
+    CONFIG_PROPERTIES_MAP = new HashMap<String, 
ConfigProperty>(CONFIG_PROPERTIES.size());
+    for (ConfigProperty prop : CONFIG_PROPERTIES) {
+      CONFIG_PROPERTIES_MAP.put(prop.getKey(), prop);
     }
+  }
+
+  private final Map<String, String> properties;
+
+  public KafkaCrossDcConf(Map<String, String> properties) {
+    List<String> nullValueKeys = new ArrayList<String>();
+    properties.forEach((k, v) -> {
+      if (v == null) {
+        nullValueKeys.add(k);
+      }
+    });
+    nullValueKeys.forEach(properties::remove);
+    this.properties = properties;
+  }
+
+  public String get(String property) {
+    return CONFIG_PROPERTIES_MAP.get(property).getValue(properties);
+  }
 
-    public int getFetchMinBytes() {
-      return fetchMinBytes;
+  public Integer getInt(String property) {
+    ConfigProperty prop = CONFIG_PROPERTIES_MAP.get(property);
+    if (prop == null) {
+      throw new IllegalArgumentException("Property not found key=" + property);
     }
+    return prop.getValueAsInt(properties);
+  }
 
-    public int getFetchMaxWaitMS() {
-      return fetchMaxWaitMS;
+  public Boolean getBool(String property) {
+    ConfigProperty prop = CONFIG_PROPERTIES_MAP.get(property);
+    if (prop == null) {
+      throw new IllegalArgumentException("Property not found key=" + property);
     }
+    return prop.getValueAsBoolean(properties);
+  }
+  
+  public Properties getAdditionalProperties() {
+    Properties additional = new Properties();
+    additional.putAll(properties);
+    for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
+      additional.remove(configProperty.getKey());
+    }
+    Map<String, Object> integerProperties = new HashMap<>();
+    additional.forEach((k, v) -> {
+      try {
+        int intVal = Integer.parseInt((String) v);
+        integerProperties.put(k.toString(), intVal);
+      } catch (NumberFormatException ignored) {
+
+      }
+    });
+    additional.putAll(integerProperties);
+    return additional;
+  }
 
-    @Override
-    public String toString() {
-        return String.format("KafkaCrossDcConf{" +
-                "topicName='%s', " +
-                "groupId='%s', " +
-                "enableDataEncryption='%b', " +
-                "bootstrapServers='%s', " +
-                "slowSubmitThresholdInMillis='%d', " +
-                "numOfRetries='%d', " +
-                "batchSizeBytes='%d', " +
-                "bufferMemoryBytes='%d', " +
-                "lingerMs='%d', " +
-                "requestTimeout='%d', " +
-                "fetchMinBytes='%d', " +
-                "fetchMaxWaitMS='%d', " +
-                "solrZkConnectString='%s'}",
-                topicName, groupId, enableDataEncryption, bootstrapServers,
-                slowSubmitThresholdInMillis, numOfRetries, batchSizeBytes,
-                bufferMemoryBytes, lingerMs, requestTimeout, fetchMinBytes, 
fetchMaxWaitMS, solrZkConnectString);
+  @Override public String toString() {
+    StringBuilder sb = new StringBuilder(128);
+    for (ConfigProperty configProperty : CONFIG_PROPERTIES) {
+      sb.append(configProperty.getKey()).append("=")
+          .append(properties.get(configProperty.getKey())).append(",");
     }
+    sb.setLength(sb.length() - 1);
+
+    return "KafkaCrossDcConf{" + sb.toString() + "}";
+  }
+
 }
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 787e6af..5afef79 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -30,12 +30,13 @@ import java.lang.invoke.MethodHandles;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS;
+
 public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
 
     private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-    private long lastSuccessfulEnqueueNanos;
-    private KafkaCrossDcConf conf;
+    private final KafkaCrossDcConf conf;
     private final Producer<String, MirroredSolrRequest> producer;
 
     public KafkaMirroringSink(final KafkaCrossDcConf conf) {
@@ -56,27 +57,25 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
         // Create Producer record
         try {
 
-            producer.send(new ProducerRecord(conf.getTopicName(), request), 
(metadata, exception) -> {
-                if (log.isDebugEnabled()) {
-                    log.debug("Producer finished sending metadata={}, 
exception={}", metadata,
-                        exception);
+            producer.send(new 
ProducerRecord<>(conf.get(KafkaCrossDcConf.TOPIC_NAME), request), (metadata, 
exception) -> {
+                if (exception != null) {
+                    log.error("Failed adding update to CrossDC queue! 
request=" + request.getSolrRequest(), exception);
                 }
             });
 
-            lastSuccessfulEnqueueNanos = System.nanoTime();
+            long lastSuccessfulEnqueueNanos = System.nanoTime();
             // Record time since last successful enqueue as 0
             long elapsedTimeMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
             // Update elapsed time
 
-            if (elapsedTimeMillis > conf.getSlowSubmitThresholdInMillis()) {
+            if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
                 slowSubmitAction(request, elapsedTimeMillis);
             }
         } catch (Exception e) {
             // We are intentionally catching all exceptions, the expected 
exception form this function is {@link MirroringException}
-
-            String message = String.format("Unable to enqueue request %s, # of 
attempts %s", request, conf.getNumOfRetries());
+            String message = "Unable to enqueue request " + request + ", 
configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
+                " and configured max delivery timeout in ms is " + 
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
             log.error(message, e);
-
             throw new MirroringException(message, e);
         }
     }
@@ -90,31 +89,39 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
      */
     private Producer<String, MirroredSolrRequest> initProducer() {
         // Initialize and return Kafka producer
-        Properties props = new Properties();
+        Properties kafkaProducerProps = new Properties();
 
         log.info("Creating Kafka producer! Configurations {} ", 
conf.toString());
 
-        props.put("bootstrap.servers", conf.getBootStrapServers());
+        kafkaProducerProps.put("bootstrap.servers", 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+
+        kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
+        if (retries != null) {
+            kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 
Integer.parseInt(retries));
+        }
+        kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
+        kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
+        kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 
conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
+        kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 
conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
+        kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.LINGER_MS));
+        kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time 
that causes consumer to be kicked out
+        kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION));
 
-        props.put("acks", "all");
-        props.put("retries", 3);
-        props.put(ProducerConfig.BATCH_SIZE_CONFIG, conf.getBatchSizeBytes());
-        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 
conf.getBufferMemoryBytes());
-        props.put(ProducerConfig.LINGER_MS_CONFIG, conf.getLingerMs());
-        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
conf.getRequestTimeout()); // should be less than time that causes consumer to 
be kicked out
+        kafkaProducerProps.put("key.serializer", 
StringSerializer.class.getName());
+        kafkaProducerProps.put("value.serializer", 
MirroredSolrRequestSerializer.class.getName());
 
-        props.put("key.serializer", StringSerializer.class.getName());
-        props.put("value.serializer", 
MirroredSolrRequestSerializer.class.getName());
+        kafkaProducerProps.putAll(conf.getAdditionalProperties());
 
         if (log.isDebugEnabled()) {
-            log.debug("Kafka Producer props={}", props);
+            log.debug("Kafka Producer props={}", kafkaProducerProps);
         }
 
         ClassLoader originalContextClassLoader = 
Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(null);
         Producer<String, MirroredSolrRequest> producer;
         try {
-            producer = new KafkaProducer(props);
+            producer = new KafkaProducer<>(kafkaProducerProps);
         } finally {
             
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
         }
@@ -123,7 +130,7 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
 
     private void slowSubmitAction(Object request, long elapsedTimeMillis) {
         log.warn("Enqueuing the request to Kafka took more than {} millis. 
enqueueElapsedTime={}",
-                conf.getSlowSubmitThresholdInMillis(),
+                conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS),
                 elapsedTimeMillis);
     }
 
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index a19da19..f77a5bb 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -18,6 +18,7 @@ package org.apache.solr.crossdc.consumer;
 
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.cloud.SolrZkClient;
+import org.apache.solr.crossdc.common.ConfigProperty;
 import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
@@ -29,6 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -37,17 +40,9 @@ import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
 
 // Cross-DC Consumer main class
 public class Consumer {
-    public static final String DEFAULT_PORT = "8090";
-    public static final String TOPIC_NAME = "topicName";
-    public static final String GROUP_ID = "groupId";
-    public static final String PORT = "port";
-    public static final String BOOTSTRAP_SERVERS = "bootstrapServers";
-    private static final String DEFAULT_GROUP_ID = "SolrCrossDCConsumer";
-    private static final String MAX_POLL_RECORDS = "maxPollRecords";
-    public static final String DEFAULT_MAX_POLL_RECORDS = "500";
-
-    private static final String DEFAULT_FETCH_MIN_BYTES = "512000";
-    private static final String DEFAULT_FETCH_MAX_WAIT_MS = "1000";
+
+
+
 
     private static boolean enabled = true;
 
@@ -61,57 +56,19 @@ public class Consumer {
     private Server server;
     CrossDcConsumer crossDcConsumer;
 
-    public void start(String bootstrapServers, String zkConnectString, String 
topicName, String groupId, int maxPollRecords, int batchSizeBytes, int 
bufferMemoryBytes, int lingerMs, int requestTimeout,
-        int fetchMinBytes, int fetchMaxWaitMS, boolean enableDataEncryption, 
int port) {
-        if (bootstrapServers == null) {
-            throw new IllegalArgumentException("bootstrapServers config was 
not passed at startup");
-        }
-        if (zkConnectString == null) {
-            throw new IllegalArgumentException("zkConnectString config was not 
passed at startup");
-        }
-        if (topicName == null) {
-            throw new IllegalArgumentException("topicName config was not 
passed at startup");
-        }
-
-        if (maxPollRecords == -1) {
-            maxPollRecords = Integer.parseInt(DEFAULT_MAX_POLL_RECORDS);
-        }
-
-        if (batchSizeBytes == -1) {
-            batchSizeBytes = Integer.parseInt(DEFAULT_BATCH_SIZE_BYTES);
-        }
-
-        if (bufferMemoryBytes == -1) {
-            bufferMemoryBytes = Integer.parseInt(DEFAULT_BUFFER_MEMORY_BYTES);
-        }        
-        
-        if (lingerMs == -1) {
-            lingerMs = Integer.parseInt(DEFAULT_LINGER_MS);
-        }
-
-        if (requestTimeout == -1) {
-            requestTimeout = Integer.parseInt(DEFAULT_REQUEST_TIMEOUT);
-        }
-
-        if (fetchMinBytes == -1) {
-            fetchMinBytes = Integer.parseInt(DEFAULT_FETCH_MIN_BYTES);
-        }
+    public void start(Map<String, String> properties) {
 
-        if (fetchMaxWaitMS == -1) {
-            fetchMaxWaitMS = Integer.parseInt(DEFAULT_FETCH_MAX_WAIT_MS);
-        }
 
         //server = new Server();
         //ServerConnector connector = new ServerConnector(server);
         //connector.setPort(port);
         //server.setConnectors(new Connector[] {connector})
-
-        crossDcConsumer = getCrossDcConsumer(bootstrapServers, 
zkConnectString, topicName, groupId, maxPollRecords, batchSizeBytes, 
bufferMemoryBytes, lingerMs,
-            requestTimeout, fetchMinBytes, fetchMaxWaitMS, 
enableDataEncryption);
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
+        crossDcConsumer = getCrossDcConsumer(conf);
 
         // Start consumer thread
 
-        log.info("Starting CrossDC Consumer bootstrapServers={}, 
zkConnectString={}, topicName={}, groupId={}, enableDataEncryption={}", 
bootstrapServers, zkConnectString, topicName, groupId, enableDataEncryption);
+        log.info("Starting CrossDC Consumer {}", conf);
 
         consumerThreadExecutor = Executors.newSingleThreadExecutor();
         consumerThreadExecutor.submit(crossDcConsumer);
@@ -121,58 +78,41 @@ public class Consumer {
         Runtime.getRuntime().addShutdownHook(shutdownHook);
     }
 
-    private CrossDcConsumer getCrossDcConsumer(String bootstrapServers, String 
zkConnectString, String topicName, String groupId, int maxPollRecords,
-        int batchSizeBytes, int bufferMemoryBytes, int lingerMs, int 
requestTimeout, int fetchMinBytes, int fetchMaxWaitMS, boolean 
enableDataEncryption) {
-
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, 
topicName, groupId, maxPollRecords, batchSizeBytes, bufferMemoryBytes, lingerMs,
-            requestTimeout, fetchMinBytes, fetchMaxWaitMS, 
enableDataEncryption, zkConnectString);
+    private CrossDcConsumer getCrossDcConsumer(KafkaCrossDcConf conf) {
         return new KafkaCrossDcConsumer(conf);
     }
 
     public static void main(String[] args) {
 
-        String zkConnectString = System.getProperty("zkConnectString");
-        if (zkConnectString == null || zkConnectString.isBlank()) {
-            throw new IllegalArgumentException("zkConnectString not specified 
for producer");
+        Map<String,String> properties = new HashMap<>();
+
+        for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+            properties.put(configKey.getKey(), 
System.getProperty(configKey.getKey()));
         }
 
-        String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS);
-        // boolean enableDataEncryption = 
Boolean.getBoolean("enableDataEncryption");
-        String topicName = System.getProperty(TOPIC_NAME);
-        String port = System.getProperty(PORT);
-        String groupId = System.getProperty(GROUP_ID, "");
-        String maxPollRecords = System.getProperty("maxPollRecords");
-        String batchSizeBytes = System.getProperty("batchSizeBytes");
-        String bufferMemoryBytes = System.getProperty("bufferMemoryBytes");
-        String lingerMs = System.getProperty("lingerMs");
-        String requestTimeout = System.getProperty("requestTimeout");
-        String fetchMinBytes = System.getProperty("fetchMinBytes");
-        String fetchMaxWaitMS = System.getProperty("fetchMaxWaitMS");
+        String zkConnectString = 
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
+        if (zkConnectString == null || zkConnectString.isBlank()) {
+            throw new IllegalArgumentException("zkConnectString not specified 
for Consumer");
+        }
 
         try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
 
             try {
-                if ((topicName == null || topicName.isBlank()) || (groupId == 
null || groupId.isBlank())
-                    || (bootstrapServers == null || 
bootstrapServers.isBlank()) || (port == null || port.isBlank()) || 
(maxPollRecords == null || maxPollRecords.isBlank())
-                    || (batchSizeBytes == null || batchSizeBytes.isBlank()) || 
(bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) || (lingerMs == null 
|| lingerMs.isBlank())
-                    || (requestTimeout == null || requestTimeout.isBlank())  
|| (fetchMinBytes == null || fetchMinBytes.isBlank())  || (fetchMaxWaitMS == 
null || fetchMaxWaitMS.isBlank())
-                    && 
client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, 
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
+                if 
(client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, 
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
                     byte[] data = 
client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, 
KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
-                    Properties props = new Properties();
-                    props.load(new ByteArrayInputStream(data));
-
-                    topicName = getConfig(TOPIC_NAME, topicName, props);
-                    bootstrapServers = getConfig(BOOTSTRAP_SERVERS, 
bootstrapServers, props);
-                    port = getConfig(PORT, port, props);
-                    groupId = getConfig(GROUP_ID, groupId, props);
-                    maxPollRecords = getConfig(MAX_POLL_RECORDS, 
maxPollRecords, props);
-                    batchSizeBytes = getConfig("batchSizeBytes", 
batchSizeBytes, props);
-                    bufferMemoryBytes = getConfig("bufferMemoryBytes", 
bufferMemoryBytes, props);
-                    lingerMs = getConfig("lingerMs", lingerMs, props);
-                    requestTimeout = getConfig("requestTimeout", 
requestTimeout, props);
-                    fetchMinBytes = getConfig("fetchMinBytes", fetchMinBytes, 
props);
-                    fetchMaxWaitMS = getConfig("fetchMaxWaitMS", 
fetchMaxWaitMS, props);
-
+                    Properties zkProps = new Properties();
+                    zkProps.load(new ByteArrayInputStream(data));
+                    Properties zkPropsUnproccessed = new Properties(zkProps);
+
+                    for (ConfigProperty configKey : 
KafkaCrossDcConf.CONFIG_PROPERTIES) {
+                        if (properties.get(configKey.getKey()) == null || 
properties.get(configKey.getKey()).isBlank()) {
+                            properties.put(configKey.getKey(), (String) 
zkProps.get(configKey.getKey()));
+                            zkPropsUnproccessed.remove(configKey.getKey());
+                        }
+                    }
+                    zkPropsUnproccessed.forEach((k, v) -> {
+                        properties.put((String) k, (String) v);
+                    });
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -182,54 +122,18 @@ public class Consumer {
             }
         }
 
-        if (port == null) {
-            port = DEFAULT_PORT;
-        }
-
+        String bootstrapServers = 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
         if (bootstrapServers == null || bootstrapServers.isBlank()) {
-          throw new IllegalArgumentException("boostrapServers not specified 
for producer");
-        }
-        if (topicName == null || topicName.isBlank()) {
-            throw new IllegalArgumentException("topicName not specified for 
producer");
-        }
-
-        if (groupId.isBlank()) {
-            groupId = DEFAULT_GROUP_ID;
+            throw new IllegalArgumentException("bootstrapServers not specified 
for Consumer");
         }
 
-        if (maxPollRecords == null || maxPollRecords.isBlank()) {
-            maxPollRecords = DEFAULT_MAX_POLL_RECORDS;
-        }
-        if (batchSizeBytes == null || batchSizeBytes.isBlank()) {
-            batchSizeBytes = DEFAULT_BATCH_SIZE_BYTES;
-        }
-        if (bufferMemoryBytes == null || bufferMemoryBytes.isBlank()) {
-            bufferMemoryBytes = DEFAULT_BUFFER_MEMORY_BYTES;
-        }
-        if (lingerMs == null || lingerMs.isBlank()) {
-            lingerMs = DEFAULT_LINGER_MS;
-        }
-        if (requestTimeout == null || requestTimeout.isBlank()) {
-            requestTimeout = DEFAULT_REQUEST_TIMEOUT;
-        }
-        if (fetchMinBytes == null || fetchMinBytes.isBlank()) {
-            fetchMinBytes = DEFAULT_FETCH_MIN_BYTES;
-        }
-        if (fetchMaxWaitMS == null || fetchMaxWaitMS.isBlank()) {
-            fetchMaxWaitMS = DEFAULT_FETCH_MAX_WAIT_MS;
+        String topicName = properties.get(TOPIC_NAME);
+        if (topicName == null || topicName.isBlank()) {
+            throw new IllegalArgumentException("topicName not specified for 
Consumer");
         }
 
         Consumer consumer = new Consumer();
-        consumer.start(bootstrapServers, zkConnectString, topicName, groupId, 
Integer.parseInt(maxPollRecords),
-            Integer.parseInt(batchSizeBytes), 
Integer.parseInt(bufferMemoryBytes), Integer.parseInt(lingerMs),
-            Integer.parseInt(requestTimeout), Integer.parseInt(fetchMinBytes), 
Integer.parseInt(fetchMaxWaitMS), false, Integer.parseInt(port));
-    }
-
-    private static String getConfig(String configName, String configValue, 
Properties props) {
-        if (configValue == null || configValue.isBlank()) {
-            configValue = props.getProperty(configName);
-        }
-        return configValue;
+        consumer.start(properties);
     }
 
     public void shutdown() {
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 9d7fcc9..9f95a90 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -8,6 +8,7 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.crossdc.common.*;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
@@ -33,7 +34,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   private final KafkaConsumer<String, MirroredSolrRequest> consumer;
   private final KafkaMirroringSink kafkaMirroringSink;
 
-  private final int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
+  private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
   private final String topicName;
   SolrMessageProcessor messageProcessor;
 
@@ -43,25 +44,30 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
    * @param conf The Kafka consumer configuration
    */
   public KafkaCrossDcConsumer(KafkaCrossDcConf conf) {
-    this.topicName = conf.getTopicName();
+    this.topicName = conf.get(KafkaCrossDcConf.TOPIC_NAME);
 
-    final Properties kafkaConsumerProp = new Properties();
+    final Properties kafkaConsumerProps = new Properties();
 
-    kafkaConsumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.getBootStrapServers());
+    kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
 
-    kafkaConsumerProp.put(ConsumerConfig.GROUP_ID_CONFIG, conf.getGroupId());
+    kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, 
conf.get(KafkaCrossDcConf.GROUP_ID));
 
-    kafkaConsumerProp.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getMaxPollRecords());
+    kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
 
-    kafkaConsumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
 
-    kafkaConsumerProp.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
-    kafkaConsumerProp.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getFetchMinBytes());
-    kafkaConsumerProp.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getFetchMaxWaitMS());
+    kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+
+    kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+    kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+
+    kafkaConsumerProps.putAll(conf.getAdditionalProperties());
 
     solrClient =
-        new 
CloudSolrClient.Builder(Collections.singletonList(conf.getSolrZkConnectString()),
+        new 
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
             Optional.empty()).build();
 
     messageProcessor = new SolrMessageProcessor(solrClient, new 
ResubmitBackoffPolicy() {
@@ -70,8 +76,8 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
       }
     });
 
-    log.info("Creating Kafka consumer with configuration {}", 
kafkaConsumerProp);
-    consumer = createConsumer(kafkaConsumerProp);
+    log.info("Creating Kafka consumer with configuration {}", 
kafkaConsumerProps);
+    consumer = createConsumer(kafkaConsumerProps);
 
     // Create producer for resubmitting failed requests
     log.info("Creating Kafka resubmit producer");
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 562b6b0..57a18ad 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -23,6 +23,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.CloseHook;
 import org.apache.solr.core.CoreDescriptor;
 import org.apache.solr.core.SolrCore;
+import org.apache.solr.crossdc.common.ConfigProperty;
 import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.KafkaMirroringSink;
@@ -30,13 +31,14 @@ import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.util.plugin.SolrCoreAware;
 import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.units.qual.C;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.Properties;
+import java.util.*;
 
 import static org.apache.solr.update.processor.DistributedUpdateProcessor.*;
 import static 
org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
@@ -68,16 +70,22 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
 
     /** This is instantiated in inform(SolrCore) and then shared by all 
processor instances - visible for testing */
     volatile KafkaRequestMirroringHandler mirroringHandler;
-    private String topicName;
-    private String bootstrapServers;
-
-    private Integer batchSizeBytes;
-    private Integer bufferMemoryBytes;
-    private Integer lingerMs;
-    private Integer requestTimeout;
+//    private String topicName;
+//    private String bootstrapServers;
+//
+//    private Integer batchSizeBytes;
+//    private Integer bufferMemoryBytes;
+//    private Integer lingerMs;
+//    private Integer requestTimeout;
+//
+//    private Integer maxRequestSize;
+//
+//    private String enableDataCompression;
 
     private boolean enabled = true;
 
+    private final Map<String,String> properties = new HashMap<>();
+
     @Override
     public void init(final NamedList args) {
         super.init(args);
@@ -87,13 +95,9 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
             this.enabled = false;
         }
 
-        topicName = args._getStr("topicName", null);
-        bootstrapServers = args._getStr("bootstrapServers", null);
-
-        batchSizeBytes = (Integer) args.get("batchSizeBytes");
-        bufferMemoryBytes= (Integer) args.get("bufferMemoryBytes");;
-        lingerMs = (Integer) args.get("lingerMs");;
-        requestTimeout = (Integer) args.get("requestTimeout");;
+        for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
+            properties.put(configKey.getKey(), 
args._getStr(configKey.getKey(), null));
+        }
     }
 
     private class Closer {
@@ -107,7 +111,7 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
             try {
                 this.sink.close();
             } catch (IOException e) {
-                log.error("Exception closing sink", sink);
+                log.error("Exception closing sink", e);
             }
         }
 
@@ -120,40 +124,30 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
         if (!enabled) {
             return;
         }
-
+        Properties zkProps = null;
         try {
-            if (((topicName == null || topicName.isBlank()) || 
(bootstrapServers == null || bootstrapServers.isBlank()
-                || batchSizeBytes == null || bufferMemoryBytes == null
-                || lingerMs == null || requestTimeout == null)) && 
core.getCoreContainer().getZkController()
+            if (core.getCoreContainer().getZkController()
                 
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, 
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
                 byte[] data = 
core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
 KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
 
                 if (data == null) {
-                    log.error("crossdc.properties file in Zookeeper has no 
data");
-                    throw new 
SolrException(SolrException.ErrorCode.SERVER_ERROR, "crossdc.properties file in 
Zookeeper has no data");
+                    log.error(KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in 
Zookeeper has no data");
+                    throw new 
SolrException(SolrException.ErrorCode.SERVER_ERROR, 
KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in Zookeeper has no data");
                 }
 
-                Properties props = new Properties();
-                props.load(new ByteArrayInputStream(data));
+                zkProps = new Properties();
+                zkProps.load(new ByteArrayInputStream(data));
+                Properties zkPropsUnproccessed = new Properties(zkProps);
 
-                if (topicName == null || topicName.isBlank()) {
-                    topicName = props.getProperty("topicName");
-                }
-                if (bootstrapServers == null || bootstrapServers.isBlank()) {
-                    bootstrapServers = props.getProperty("bootstrapServers");
-                }
-                if (batchSizeBytes == null) {
-                    batchSizeBytes = getIntegerPropValue("batchSizeBytes", 
props);
-                }
-                if (bufferMemoryBytes == null) {
-                    bufferMemoryBytes = 
getIntegerPropValue("bufferMemoryBytes", props);
-                }
-                if (lingerMs == null) {
-                    lingerMs = getIntegerPropValue("lingerMs", props);
-                }
-                if (requestTimeout == null) {
-                    requestTimeout = getIntegerPropValue("requestTimeout", 
props);
+                for (ConfigProperty configKey : 
KafkaCrossDcConf.CONFIG_PROPERTIES) {
+                    if (properties.get(configKey.getKey()) == null || 
properties.get(configKey.getKey()).isBlank()) {
+                        properties.put(configKey.getKey(), (String) 
zkProps.get(configKey.getKey()));
+                        zkPropsUnproccessed.remove(configKey.getKey());
+                    }
                 }
+                zkPropsUnproccessed.forEach((k, v) -> {
+                    properties.put((String) k, (String) v);
+                });
              }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -164,35 +158,22 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Exception looking for CrossDC configuration in Zookeeper", e);
         }
 
-        if (bootstrapServers == null || bootstrapServers.isBlank()) {
-           log.error("boostrapServers not specified for producer in CrossDC 
configuration");
+        if (properties.get(BOOTSTRAP_SERVERS) == null || 
properties.get(BOOTSTRAP_SERVERS).isBlank()) {
+           log.error("boostrapServers not specified for producer in CrossDC 
configuration props=" + properties + " zkProps=" + zkProps);
            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"boostrapServers not specified for producer");
        }
         
-        if (topicName == null || topicName.isBlank()) {
-            log.error("topicName not specified for producer in CrossDC 
configuration");
+        if (properties.get(TOPIC_NAME) == null || 
properties.get(TOPIC_NAME).isBlank()) {
+            log.error("topicName not specified for producer in CrossDC 
configuration props=" + properties + " zkProps=" + zkProps);
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"topicName not specified for producer");
         }
 
-        if (batchSizeBytes == null) {
-            batchSizeBytes = Integer.valueOf(DEFAULT_BATCH_SIZE_BYTES);
-        }
-        if (bufferMemoryBytes == null) {
-            bufferMemoryBytes = Integer.valueOf(DEFAULT_BUFFER_MEMORY_BYTES);
-        }
-        if (lingerMs == null) {
-            lingerMs = Integer.valueOf(DEFAULT_LINGER_MS);
-        }
-        if (requestTimeout == null) {
-            requestTimeout = Integer.valueOf(DEFAULT_REQUEST_TIMEOUT);
-        }
-
-        log.info("bootstrapServers={} topicName={}", bootstrapServers, 
topicName);
+        log.info("bootstrapServers={} topicName={}", 
properties.get(BOOTSTRAP_SERVERS) , properties.get(TOPIC_NAME));
 
         // load the request mirroring sink class and instantiate.
        // mirroringHandler = 
core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), 
KafkaRequestMirroringHandler.class);
 
-        KafkaCrossDcConf conf = new KafkaCrossDcConf(bootstrapServers, 
topicName, "",  -1, batchSizeBytes, bufferMemoryBytes, lingerMs, 
requestTimeout,-1, -1,false,  null);
+        KafkaCrossDcConf conf = new KafkaCrossDcConf(properties);
         KafkaMirroringSink sink = new KafkaMirroringSink(conf);
 
         Closer closer = new Closer(sink);
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index 7817efd..a2928ca 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -14,6 +14,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.consumer.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -24,6 +25,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 @ThreadLeakFilters(defaultFilters = true, filters = { 
SolrIgnoredThreadsFilter.class,
@@ -104,8 +107,12 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, 
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1,-1,-1,-1,
-        -1,-1, -1, false, 0);
+    Map<String, String> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    consumer.start(properties);
 
   }
 
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index 1a12a87..b15cc25 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -17,6 +17,7 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.consumer.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -27,6 +28,8 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -106,8 +109,12 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, 
solrCluster2.getZkServer().getZkAddress(), TOPIC,  "group1", -1,-1,-1,
-        -1,-1,-1, -1, false, 0);
+    Map<String, String> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    consumer.start(properties);
   }
 
   private static MiniSolrCloudCluster startCluster(MiniSolrCloudCluster 
solrCluster, ZkTestServer zkTestServer, Path baseDir) throws Exception {
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 268be5c..4b45506 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.consumer.Consumer;
@@ -29,9 +30,12 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.sys.Prop;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.mockito.Mockito.spy;
@@ -99,8 +103,12 @@ import static org.mockito.Mockito.spy;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, 
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,
-        -1,-1,-1,-1, -1,false, 0);
+    Map<String, String> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    consumer.start(properties);
 
   }
 
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index 9ac0071..c0037cd 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -15,6 +15,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.consumer.Consumer;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -24,9 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 
 @ThreadLeakFilters(defaultFilters = true, filters = { 
SolrIgnoredThreadsFilter.class,
     QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class })
@@ -91,8 +90,13 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, 
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
-        -1,-1, -1, -1, false, 0);
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    consumer.start(properties);
 
   }
 
@@ -183,7 +187,7 @@ import java.util.Properties;
     addDocs(client, "third");
 
     foundUpdates = false;
-    for (int i = 0; i < 100; i++) {
+    for (int i = 0; i < 1000; i++) {
       solrCluster2.getSolrClient().commit(COLLECTION);
       solrCluster1.getSolrClient().query(COLLECTION, new SolrQuery("*:*"));
       results = solrCluster2.getSolrClient().query(COLLECTION, new 
SolrQuery("*:*"));
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index 94ad5c5..b52a9c5 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -19,6 +19,7 @@ import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.util.ObjectReleaseTracker;
+import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.common.MirroredSolrRequest;
 import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer;
 import org.apache.solr.crossdc.consumer.Consumer;
@@ -31,6 +32,8 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 @ThreadLeakFilters(defaultFilters = true, filters = { 
SolrIgnoredThreadsFilter.class,
@@ -78,8 +81,8 @@ import java.util.Properties;
     solrCluster1 = new SolrCloudTestCase.Builder(1, 
createTempDir()).addConfig("conf",
         
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
 
-    props.setProperty("topicName", TOPIC);
-    props.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
+    props.setProperty(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    props.setProperty(KafkaCrossDcConf.BOOTSTRAP_SERVERS, 
kafkaCluster.bootstrapServers());
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     props.store(baos, "");
@@ -108,8 +111,12 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    consumer.start(bootstrapServers, 
solrCluster2.getZkServer().getZkAddress(), TOPIC, "group1", -1, -1,-1,
-        -1,-1,-1, -1, false, 0);
+    Map<String, String> properties = new HashMap<>();
+    properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
+    properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
+    properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
+    properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    consumer.start(properties);
 
   }
 
diff --git a/crossdc-producer/src/test/resources/log4j2.xml 
b/crossdc-producer/src/test/resources/log4j2.xml
index 3fd55ed..98c24fc 100644
--- a/crossdc-producer/src/test/resources/log4j2.xml
+++ b/crossdc-producer/src/test/resources/log4j2.xml
@@ -41,7 +41,7 @@
         <OnStartupTriggeringPolicy/>
         <SizeBasedTriggeringPolicy size="128 MB"/>
       </Policies>
-      <DefaultRolloverStrategy max="10"/>
+      <DefaultRolloverStrategy max="30"/>
     </RollingRandomAccessFile>
   </Appenders>
   <Loggers>
@@ -59,11 +59,11 @@
     <Logger name="org.apache.solr.hadoop" level="INFO"/>
     <Logger name="org.eclipse.jetty" level="INFO"/>
 
-    <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" 
level="TRACE"/>
-    <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" 
level="TRACE"/>
-    <Logger 
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" 
level="TRACE"/>
-    <Logger 
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" 
level="TRACE"/>
-    <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" 
level="TRACE"/>
+    <Logger name="org.apache.solr.crossdc.consumer.KafkaCrossDcConsumer" 
level="INFO"/>
+    <Logger name="org.apache.solr.update.processor.MirroringUpdateProcessor" 
level="INFO"/>
+    <Logger 
name="org.apache.solr.update.processor.KafkaRequestMirroringHandler" 
level="INFO"/>
+    <Logger 
name="org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor" 
level="INFO"/>
+    <Logger name="org.apache.solr.crossdc.common.KafkaMirroringSink" 
level="INFO"/>
 
 
     <Root level="INFO">

Reply via email to