This is an automated email from the ASF dual-hosted git repository.

markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git


The following commit(s) were added to refs/heads/crossdc-wip by this push:
     new 6c907ca  SSL config passed on to Kafka and a variety of general 
cleanup. (#37)
6c907ca is described below

commit 6c907cafef7f8d69494820be789f22e3f8ef92df
Author: Mark Robert Miller <[email protected]>
AuthorDate: Wed Sep 14 05:06:17 2022 -0500

    SSL config passed on to Kafka and a variety of general cleanup. (#37)
---
 .../apache/solr/crossdc/common/ConfigProperty.java | 15 +++-
 .../solr/crossdc/common/KafkaCrossDcConf.java      | 58 +++++++++------
 .../solr/crossdc/common/KafkaMirroringSink.java    |  4 +-
 .../solr/crossdc/common/MirroredSolrRequest.java   | 25 -------
 .../common/MirroredSolrRequestSerializer.java      | 32 ++++----
 .../org/apache/solr/crossdc/consumer/Consumer.java | 49 ++++++------
 .../crossdc/consumer/KafkaCrossDcConsumer.java     | 28 +++----
 .../update/processor/MirroringUpdateProcessor.java | 19 +++--
 .../MirroringUpdateRequestProcessorFactory.java    | 86 +++++++++++-----------
 .../apache/solr/crossdc/DeleteByQueryToIdTest.java |  2 +-
 .../solr/crossdc/RetryQueueIntegrationTest.java    |  2 +-
 .../solr/crossdc/SolrAndKafkaIntegrationTest.java  |  2 +-
 .../solr/crossdc/SolrAndKafkaReindexTest.java      |  3 +-
 .../solr/crossdc/ZkConfigIntegrationTest.java      |  2 +-
 14 files changed, 160 insertions(+), 167 deletions(-)

diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
index 256b6e0..8c1af02 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
@@ -1,6 +1,7 @@
 package org.apache.solr.crossdc.common;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 public class ConfigProperty {
@@ -47,9 +48,12 @@ public class ConfigProperty {
   }
 
   public Integer getValueAsInt(Map properties) {
-    String value = (String) properties.get(key);
+    Object value = (Object) properties.get(key);
     if (value != null) {
-      return Integer.parseInt(value);
+      if (value instanceof Integer) {
+        return (Integer) value;
+      }
+      return Integer.parseInt(value.toString());
     }
     if (defaultValue == null) {
       return null;
@@ -58,9 +62,12 @@ public class ConfigProperty {
   }
 
   public Boolean getValueAsBoolean(Map properties) {
-    String value = (String) properties.get(key);
+    Object value = (Object) properties.get(key);
     if (value != null) {
-      return Boolean.parseBoolean(value);
+      if (value instanceof Boolean) {
+        return (Boolean) value;
+      }
+      return Boolean.parseBoolean(value.toString());
     }
     return Boolean.parseBoolean(defaultValue);
   }
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index e7e510f..9a14a1b 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -16,10 +16,8 @@
  */
 package org.apache.solr.crossdc.common;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 
@@ -90,10 +88,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
   public static final String ZK_CONNECT_STRING = "zkConnectString";
 
 
-
-
   public static final List<ConfigProperty> CONFIG_PROPERTIES;
-  private static final HashMap<String, ConfigProperty> CONFIG_PROPERTIES_MAP;
+  private static final Map<String, ConfigProperty> CONFIG_PROPERTIES_MAP;
+
+  public static final List<ConfigProperty> SECURITY_CONFIG_PROPERTIES;
 
   public static final String PORT = "port";
 
@@ -102,7 +100,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
 
 
   static {
-    CONFIG_PROPERTIES =
+    List<ConfigProperty> configProperties = new ArrayList<>(
         List.of(new ConfigProperty(TOPIC_NAME), new 
ConfigProperty(BOOTSTRAP_SERVERS),
             new ConfigProperty(BATCH_SIZE_BYTES, DEFAULT_BATCH_SIZE_BYTES),
             new ConfigProperty(BUFFER_MEMORY_BYTES, 
DEFAULT_BUFFER_MEMORY_BYTES),
@@ -124,9 +122,11 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(MAX_PARTITION_FETCH_BYTES, 
DEFAULT_MAX_PARTITION_FETCH_BYTES),
             new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
             new ConfigProperty(PORT, DEFAULT_PORT),
-            new ConfigProperty(GROUP_ID, DEFAULT_GROUP_ID),
-            
-            // SSL
+            new ConfigProperty(GROUP_ID, DEFAULT_GROUP_ID)));
+
+
+    SECURITY_CONFIG_PROPERTIES =
+        List.of(
             new ConfigProperty(SslConfigs.SSL_PROTOCOL_CONFIG),
             new ConfigProperty(SslConfigs.SSL_PROVIDER_CONFIG),
             new ConfigProperty(SslConfigs.SSL_CIPHER_SUITES_CONFIG),
@@ -149,22 +149,25 @@ public class KafkaCrossDcConf extends CrossDcConf {
             new ConfigProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG),
 
 
-            // Admin Client Security
-            new ConfigProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG),
+            // From Common and Admin Client Security
+            new ConfigProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG),
             new ConfigProperty(AdminClientConfig.SECURITY_PROVIDERS_CONFIG)
-            );
-
+        );
 
+    configProperties.addAll(SECURITY_CONFIG_PROPERTIES);
+    CONFIG_PROPERTIES = Collections.unmodifiableList(configProperties);
 
-    CONFIG_PROPERTIES_MAP = new HashMap<String, 
ConfigProperty>(CONFIG_PROPERTIES.size());
+    Map<String, ConfigProperty> configPropertiesMap =
+        new HashMap<String, ConfigProperty>(CONFIG_PROPERTIES.size());
     for (ConfigProperty prop : CONFIG_PROPERTIES) {
-      CONFIG_PROPERTIES_MAP.put(prop.getKey(), prop);
+      configPropertiesMap.put(prop.getKey(), prop);
     }
+    CONFIG_PROPERTIES_MAP = configPropertiesMap;
   }
 
-  private final Map<String, String> properties;
+  private final Map<String, Object> properties;
 
-  public KafkaCrossDcConf(Map<String, String> properties) {
+  public KafkaCrossDcConf(Map<String, Object> properties) {
     List<String> nullValueKeys = new ArrayList<String>();
     properties.forEach((k, v) -> {
       if (v == null) {
@@ -175,6 +178,15 @@ public class KafkaCrossDcConf extends CrossDcConf {
     this.properties = properties;
   }
 
+  public static void addSecurityProps(KafkaCrossDcConf conf, Properties 
kafkaConsumerProps) {
+    for (ConfigProperty property : SECURITY_CONFIG_PROPERTIES) {
+      String val = conf.get(property.getKey());
+      if (val != null) {
+        kafkaConsumerProps.put(property.getKey(), val);
+      }
+    }
+  }
+
   public String get(String property) {
     return CONFIG_PROPERTIES_MAP.get(property).getValue(properties);
   }
@@ -202,15 +214,17 @@ public class KafkaCrossDcConf extends CrossDcConf {
       additional.remove(configProperty.getKey());
     }
     Map<String, Object> integerProperties = new HashMap<>();
-    additional.forEach((k, v) -> {
+    additional.forEach((key, v) -> {
       try {
         int intVal = Integer.parseInt((String) v);
-        integerProperties.put(k.toString(), intVal);
+        integerProperties.put(key.toString(), intVal);
       } catch (NumberFormatException ignored) {
 
       }
     });
-    additional.putAll(integerProperties);
+    integerProperties.forEach((key, v) -> {
+      additional.setProperty(key, (String) v);
+    });
     return additional;
   }
 
@@ -222,7 +236,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
     }
     sb.setLength(sb.length() - 1);
 
-    return "KafkaCrossDcConf{" + sb.toString() + "}";
+    return "KafkaCrossDcConf{" + sb + "}";
   }
 
 }
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 5afef79..9cfa67f 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -93,7 +93,7 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
 
         log.info("Creating Kafka producer! Configurations {} ", 
conf.toString());
 
-        kafkaProducerProps.put("bootstrap.servers", 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+        kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
 
         kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
         String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
@@ -111,6 +111,8 @@ public class KafkaMirroringSink implements 
RequestMirroringSink, Closeable {
         kafkaProducerProps.put("key.serializer", 
StringSerializer.class.getName());
         kafkaProducerProps.put("value.serializer", 
MirroredSolrRequestSerializer.class.getName());
 
+        KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps);
+
         kafkaProducerProps.putAll(conf.getAdditionalProperties());
 
         if (log.isDebugEnabled()) {
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
index 4c96116..74dc785 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
@@ -17,10 +17,8 @@
 package org.apache.solr.crossdc.common;
 
 import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.common.params.SolrParams;
 
 import java.util.*;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Class to encapsulate a mirrored Solr request.
@@ -55,29 +53,6 @@ public class MirroredSolrRequest {
         solrRequest = null;
     }
 
-    public static MirroredSolrRequest 
mirroredAdminCollectionRequest(SolrParams params) {
-        Map<String, List<String>> createParams = new HashMap();
-        // don't mirror back
-        createParams.put(CrossDcConstants.SHOULD_MIRROR, 
Collections.singletonList("false"));
-
-        final Iterator<String> paramNamesIterator = 
params.getParameterNamesIterator();
-        while (paramNamesIterator.hasNext()) {
-            final String key = paramNamesIterator.next();
-            if (key.equals("createNodeSet") || key.equals("node")) {
-                // don't forward as nodeset most likely makes no sense here.
-                // should we log when we skip this parameter that was part of 
the original request ?
-                continue;
-            }
-            final String[] values = params.getParams(key);
-            if (values != null) {
-                createParams.put(key, Arrays.asList(values));
-            }
-        }
-
-        return new MirroredSolrRequest(1,
-                TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
-    }
-
     public int getAttempt() {
         return attempt;
     }
diff --git 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 856f1c9..3f0684d 100644
--- 
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++ 
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -18,12 +18,9 @@ package org.apache.solr.crossdc.common;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
 import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.params.MapSolrParams;
 import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.MultiMapSolrParams;
 import org.apache.solr.common.util.JavaBinCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -116,27 +113,30 @@ public class MirroredSolrRequestSerializer implements 
Serializer<MirroredSolrReq
         UpdateRequest solrRequest = (UpdateRequest) request.getSolrRequest();
 
         if (log.isTraceEnabled()) {
-            log.trace("serialize request={} docs={} deletebyid={}", 
solrRequest, solrRequest.getDocuments(), solrRequest.getDeleteById());
+            log.trace("serialize request={} docs={} deletebyid={}", 
solrRequest,
+                solrRequest.getDocuments(), solrRequest.getDeleteById());
         }
 
-        JavaBinCodec codec = new JavaBinCodec(null);
+        try (JavaBinCodec codec = new JavaBinCodec(null)) {
 
-        ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
-        Map map = new HashMap();
-        map.put("params", solrRequest.getParams());
-        map.put("docs", solrRequest.getDocuments());
+            ExposedByteArrayOutputStream baos = new 
ExposedByteArrayOutputStream();
+            Map map = new HashMap(4);
+            map.put("params", solrRequest.getParams());
+            map.put("docs", solrRequest.getDocuments());
 
-        // TODO
-        //map.put("deletes", solrRequest.getDeleteByIdMap());
-        map.put("deletes", solrRequest.getDeleteById());
-        map.put("deleteQuery", solrRequest.getDeleteQuery());
+            // TODO
+            //map.put("deletes", solrRequest.getDeleteByIdMap());
+            map.put("deletes", solrRequest.getDeleteById());
+            map.put("deleteQuery", solrRequest.getDeleteQuery());
 
-        try {
             codec.marshal(map, baos);
+
+            return baos.byteArray();
+
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return baos.byteArray();
+
     }
 
     /**
@@ -145,7 +145,7 @@ public class MirroredSolrRequestSerializer implements 
Serializer<MirroredSolrReq
      * This method must be idempotent as it may be called multiple times.
      */
     @Override
-    public void close() {
+    public final void close() {
         Serializer.super.close();
     }
 
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index f77a5bb..b3cb2b3 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -22,9 +22,7 @@ import org.apache.solr.crossdc.common.ConfigProperty;
 import org.apache.solr.crossdc.common.CrossDcConf;
 import org.apache.solr.crossdc.common.KafkaCrossDcConf;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
-import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,22 +39,14 @@ import static 
org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
 // Cross-DC Consumer main class
 public class Consumer {
 
-
-
-
-    private static boolean enabled = true;
+    private static final boolean enabled = true;
 
     private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-    /**
-     * ExecutorService to manage the cross-dc consumer threads.
-     */
-    private ExecutorService consumerThreadExecutor;
-
     private Server server;
-    CrossDcConsumer crossDcConsumer;
+    private CrossDcConsumer crossDcConsumer;
 
-    public void start(Map<String, String> properties) {
+    public void start(Map<String, Object> properties) {
 
 
         //server = new Server();
@@ -70,7 +60,10 @@ public class Consumer {
 
         log.info("Starting CrossDC Consumer {}", conf);
 
-        consumerThreadExecutor = Executors.newSingleThreadExecutor();
+        /**
+         * ExecutorService to manage the cross-dc consumer threads.
+         */
+        ExecutorService consumerThreadExecutor = 
Executors.newSingleThreadExecutor();
         consumerThreadExecutor.submit(crossDcConsumer);
 
         // Register shutdown hook
@@ -84,13 +77,13 @@ public class Consumer {
 
     public static void main(String[] args) {
 
-        Map<String,String> properties = new HashMap<>();
+        Map<String,Object> properties = new HashMap<>();
 
         for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
             properties.put(configKey.getKey(), 
System.getProperty(configKey.getKey()));
         }
 
-        String zkConnectString = 
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
+        String zkConnectString = (String) 
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
         if (zkConnectString == null || zkConnectString.isBlank()) {
             throw new IllegalArgumentException("zkConnectString not specified 
for Consumer");
         }
@@ -98,20 +91,24 @@ public class Consumer {
         try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
 
             try {
-                if 
(client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, 
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
-                    byte[] data = 
client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, 
KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
+                if 
(client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                    CrossDcConf.CROSSDC_PROPERTIES), true)) {
+                    byte[] data = 
client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                        CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
                     Properties zkProps = new Properties();
                     zkProps.load(new ByteArrayInputStream(data));
-                    Properties zkPropsUnproccessed = new Properties(zkProps);
+
+                    Map<Object, Object> zkPropsUnproccessed = new 
HashMap<>(zkProps);
 
                     for (ConfigProperty configKey : 
KafkaCrossDcConf.CONFIG_PROPERTIES) {
-                        if (properties.get(configKey.getKey()) == null || 
properties.get(configKey.getKey()).isBlank()) {
-                            properties.put(configKey.getKey(), (String) 
zkProps.get(configKey.getKey()));
+                        if (properties.get(configKey.getKey()) == null || 
((String)properties.get(configKey.getKey())).isBlank()) {
+                            properties.put(configKey.getKey(), (String) 
zkProps.getProperty(
+                                configKey.getKey()));
                             zkPropsUnproccessed.remove(configKey.getKey());
                         }
                     }
-                    zkPropsUnproccessed.forEach((k, v) -> {
-                        properties.put((String) k, (String) v);
+                    zkPropsUnproccessed.forEach((key, val) -> {
+                        properties.put((String) key, (String) val);
                     });
                 }
             } catch (InterruptedException e) {
@@ -122,12 +119,12 @@ public class Consumer {
             }
         }
 
-        String bootstrapServers = 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
+        String bootstrapServers = (String) 
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
         if (bootstrapServers == null || bootstrapServers.isBlank()) {
             throw new IllegalArgumentException("bootstrapServers not specified 
for Consumer");
         }
 
-        String topicName = properties.get(TOPIC_NAME);
+        String topicName = (String) properties.get(TOPIC_NAME);
         if (topicName == null || topicName.isBlank()) {
             throw new IllegalArgumentException("topicName not specified for 
Consumer");
         }
@@ -136,7 +133,7 @@ public class Consumer {
         consumer.start(properties);
     }
 
-    public void shutdown() {
+    public final void shutdown() {
         if (crossDcConsumer != null) {
             crossDcConsumer.shutdown();
         }
diff --git 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 9f95a90..451253b 100644
--- 
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++ 
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -8,7 +8,6 @@ import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
 import org.apache.solr.common.util.IOUtils;
 import org.apache.solr.crossdc.common.*;
 import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
@@ -36,9 +35,9 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
   private final String topicName;
-  SolrMessageProcessor messageProcessor;
+  private final SolrMessageProcessor messageProcessor;
 
-  CloudSolrClient solrClient;
+  private final CloudSolrClient solrClient;
 
   /**
    * @param conf The Kafka consumer configuration
@@ -64,17 +63,15 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
     kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
     kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
 
+    KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
+
     kafkaConsumerProps.putAll(conf.getAdditionalProperties());
 
     solrClient =
         new 
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
             Optional.empty()).build();
 
-    messageProcessor = new SolrMessageProcessor(solrClient, new 
ResubmitBackoffPolicy() {
-      @Override public long getBackoffTimeMs(MirroredSolrRequest 
resubmitRequest) {
-        return 0;
-      }
-    });
+    messageProcessor = new SolrMessageProcessor(solrClient, resubmitRequest -> 
0L);
 
     log.info("Creating Kafka consumer with configuration {}", 
kafkaConsumerProps);
     consumer = createConsumer(kafkaConsumerProps);
@@ -86,10 +83,9 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
 
   }
 
-  private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties 
properties) {
-    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new 
StringDeserializer(),
+  public static KafkaConsumer<String, MirroredSolrRequest> 
createConsumer(Properties properties) {
+    return new KafkaConsumer<>(properties, new StringDeserializer(),
         new MirroredSolrRequestSerializer());
-    return kafkaConsumer;
   }
 
   /**
@@ -145,7 +141,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
               log.trace("Fetched record from topic={} partition={} key={} 
value={}", record.topic(),
                   record.partition(), record.key(), record.value());
             }
-            IQueueHandler.Result result = 
messageProcessor.handleItem(record.value());
+            IQueueHandler.Result<MirroredSolrRequest> result = 
messageProcessor.handleItem(record.value());
             switch (result.status()) {
               case FAILED_RESUBMIT:
                 // currently, we use a strategy taken from an earlier working 
implementation
@@ -200,8 +196,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
         } catch (Exception e) {
           // If there is any exception returned by handleItem, then reset the 
offset.
 
-          if (e instanceof ClassCastException || e instanceof 
ClassNotFoundException
-              || e instanceof SerializationException) { // TODO: optional
+          if (e instanceof ClassCastException || e instanceof 
SerializationException) { // TODO: optional
             log.error("Non retryable error", e);
             break;
           }
@@ -215,8 +210,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
       return false;
     } catch (Exception e) {
 
-      if (e instanceof ClassCastException || e instanceof 
ClassNotFoundException
-          || e instanceof SerializationException) { // TODO: optional
+      if (e instanceof ClassCastException || e instanceof 
SerializationException) { // TODO: optional
         log.error("Non retryable error", e);
         return false;
       }
@@ -262,7 +256,7 @@ public class KafkaCrossDcConsumer extends 
Consumer.CrossDcConsumer {
   /**
    * Shutdown the Kafka consumer by calling wakeup.
    */
-  public void shutdown() {
+  public final void shutdown() {
     log.info("Shutdown called on KafkaCrossDcConsumer");
     try {
       solrClient.close();
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 02282b6..758ca8e 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -14,8 +14,6 @@ import org.apache.solr.common.SolrInputDocument;
 import org.apache.solr.common.cloud.*;
 import org.apache.solr.common.params.*;
 import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.schema.IndexSchema;
-import org.apache.solr.schema.SchemaField;
 import org.apache.solr.update.AddUpdateCommand;
 import org.apache.solr.update.CommitUpdateCommand;
 import org.apache.solr.update.DeleteUpdateCommand;
@@ -126,8 +124,9 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
           String nextCursorMark = rsp.getNextCursorMark();
 
           if (log.isDebugEnabled()) {
-            log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, 
nextCursorMark, cnt++,
+            log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark, 
nextCursorMark, cnt,
                 rsp.getResults());
+            cnt++;
           }
 
           processDBQResults(client, collection, uniqueField, rsp);
@@ -147,15 +146,14 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
     if (doMirroring) {
       boolean isLeader = false;
       if (cmd.isDeleteById()) {
-        DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
         // deleteById requests runs once per leader, so we just submit the 
request from the leader shard
-        isLeader = isLeader(cmd.getReq(),  dcmd.getId(), null != 
cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
+        isLeader = isLeader(cmd.getReq(),  ((DeleteUpdateCommand)cmd).getId(), 
null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
             ShardParams._ROUTE_), null);
         if (isLeader) {
           createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip 
versions from deletes
         }
         if (log.isDebugEnabled())
-          log.debug("processDelete doMirroring={} isLeader={} cmd={}", 
doMirroring, isLeader, cmd);
+          log.debug("processDelete doMirroring={} isLeader={} cmd={}", true, 
isLeader, cmd);
       } else {
         // DBQs are sent to each shard leader, so we mirror from the original 
node to only mirror once
         // In general there's no way to guarantee that these run identically 
on the mirror since there are no
@@ -166,16 +164,17 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
           createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
         }
         if (log.isDebugEnabled())
-          log.debug("processDelete doMirroring={} cmd={}", doMirroring, cmd);
+          log.debug("processDelete doMirroring={} cmd={}", true, cmd);
       }
 
     }
   }
 
-  private void processDBQResults(SolrClient client, String collection, String 
uniqueField, QueryResponse rsp)
+  private static void processDBQResults(SolrClient client, String collection, 
String uniqueField,
+      QueryResponse rsp)
       throws SolrServerException, IOException {
     SolrDocumentList results = rsp.getResults();
-    List<String> ids = new ArrayList<>();
+    List<String> ids = new ArrayList<>(results.size());
     results.forEach(entries -> {
       String id = entries.getFirstValue(uniqueField).toString();
       ids.add(id);
@@ -225,7 +224,7 @@ public class MirroringUpdateProcessor extends 
UpdateRequestProcessor {
     if (next != null) next.processCommit(cmd);
   }
 
-  @Override public void finish() throws IOException {
+  @Override public final void finish() throws IOException {
     super.finish();
 
     if (doMirroring && mirrorRequest != null) {
diff --git 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 57a18ad..160cb46 100644
--- 
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++ 
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -30,8 +30,6 @@ import org.apache.solr.crossdc.common.KafkaMirroringSink;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
 import org.apache.solr.util.plugin.SolrCoreAware;
-import org.apache.zookeeper.KeeperException;
-import org.checkerframework.checker.units.qual.C;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,25 +64,15 @@ public class MirroringUpdateRequestProcessorFactory extends 
UpdateRequestProcess
         new NoOpUpdateRequestProcessor();
 
     // Flag for mirroring requests
-    public static String SERVER_SHOULD_MIRROR = "shouldMirror";
+    public static final String SERVER_SHOULD_MIRROR = "shouldMirror";
 
     /** This is instantiated in inform(SolrCore) and then shared by all 
processor instances - visible for testing */
-    volatile KafkaRequestMirroringHandler mirroringHandler;
-//    private String topicName;
-//    private String bootstrapServers;
-//
-//    private Integer batchSizeBytes;
-//    private Integer bufferMemoryBytes;
-//    private Integer lingerMs;
-//    private Integer requestTimeout;
-//
-//    private Integer maxRequestSize;
-//
-//    private String enableDataCompression;
+    private volatile KafkaRequestMirroringHandler mirroringHandler;
+
 
     private boolean enabled = true;
 
-    private final Map<String,String> properties = new HashMap<>();
+    private final Map<String,Object> properties = new HashMap<>();
 
     @Override
     public void init(final NamedList args) {
@@ -100,14 +88,30 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
         }
     }
 
-    private class Closer {
+    private static class MyCloseHook extends CloseHook {
+        private final Closer closer;
+
+        public MyCloseHook(Closer closer) {
+            this.closer = closer;
+        }
+
+        @Override public void preClose(SolrCore core) {
+
+        }
+
+        @Override public void postClose(SolrCore core) {
+            closer.close();
+        }
+    }
+
+    private static class Closer {
         private final KafkaMirroringSink sink;
 
         public Closer(KafkaMirroringSink sink) {
             this.sink = sink;
         }
 
-        public void close() {
+        public final void close() {
             try {
                 this.sink.close();
             } catch (IOException e) {
@@ -127,12 +131,15 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
         Properties zkProps = null;
         try {
             if (core.getCoreContainer().getZkController()
-                
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH, 
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
-                byte[] data = 
core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
 KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
+                
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                    CrossDcConf.CROSSDC_PROPERTIES), true)) {
+                byte[] data = 
core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+                    CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
 
                 if (data == null) {
-                    log.error(KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in 
Zookeeper has no data");
-                    throw new 
SolrException(SolrException.ErrorCode.SERVER_ERROR, 
KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in Zookeeper has no data");
+                    log.error(CrossDcConf.CROSSDC_PROPERTIES + " file in 
Zookeeper has no data");
+                    throw new 
SolrException(SolrException.ErrorCode.SERVER_ERROR, 
CrossDcConf.CROSSDC_PROPERTIES
+                        + " file in Zookeeper has no data");
                 }
 
                 zkProps = new Properties();
@@ -140,13 +147,14 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
                 Properties zkPropsUnproccessed = new Properties(zkProps);
 
                 for (ConfigProperty configKey : 
KafkaCrossDcConf.CONFIG_PROPERTIES) {
-                    if (properties.get(configKey.getKey()) == null || 
properties.get(configKey.getKey()).isBlank()) {
-                        properties.put(configKey.getKey(), (String) 
zkProps.get(configKey.getKey()));
+                    if (properties.get(configKey.getKey()) == null || 
((String)properties.get(configKey.getKey())).isBlank()) {
+                        properties.put(configKey.getKey(), (String) 
zkProps.getProperty(
+                            configKey.getKey()));
                         zkPropsUnproccessed.remove(configKey.getKey());
                     }
                 }
-                zkPropsUnproccessed.forEach((k, v) -> {
-                    properties.put((String) k, (String) v);
+                zkPropsUnproccessed.forEach((key, val) -> {
+                    properties.put((String) key, (String) val);
                 });
              }
         } catch (InterruptedException e) {
@@ -158,17 +166,21 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"Exception looking for CrossDC configuration in Zookeeper", e);
         }
 
-        if (properties.get(BOOTSTRAP_SERVERS) == null || 
properties.get(BOOTSTRAP_SERVERS).isBlank()) {
-           log.error("boostrapServers not specified for producer in CrossDC 
configuration props=" + properties + " zkProps=" + zkProps);
+        if (properties.get(BOOTSTRAP_SERVERS) == null || 
((String)properties.get(BOOTSTRAP_SERVERS)).isBlank()) {
+            log.error(
+                "boostrapServers not specified for producer in CrossDC 
configuration props={} zkProps={}",
+                properties, zkProps);
            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"boostrapServers not specified for producer");
        }
         
-        if (properties.get(TOPIC_NAME) == null || 
properties.get(TOPIC_NAME).isBlank()) {
-            log.error("topicName not specified for producer in CrossDC 
configuration props=" + properties + " zkProps=" + zkProps);
+        if (properties.get(TOPIC_NAME) == null || 
((String)properties.get(TOPIC_NAME)).isBlank()) {
+            log.error(
+                "topicName not specified for producer in CrossDC configuration 
props={} zkProps={}",
+                properties, zkProps);
             throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, 
"topicName not specified for producer");
         }
 
-        log.info("bootstrapServers={} topicName={}", 
properties.get(BOOTSTRAP_SERVERS) , properties.get(TOPIC_NAME));
+        log.info("bootstrapServers={} topicName={}", 
properties.get(BOOTSTRAP_SERVERS), properties.get(TOPIC_NAME));
 
         // load the request mirroring sink class and instantiate.
        // mirroringHandler = 
core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), 
KafkaRequestMirroringHandler.class);
@@ -177,20 +189,12 @@ public class MirroringUpdateRequestProcessorFactory 
extends UpdateRequestProcess
         KafkaMirroringSink sink = new KafkaMirroringSink(conf);
 
         Closer closer = new Closer(sink);
-        core.addCloseHook(new CloseHook() {
-            @Override public void preClose(SolrCore core) {
-
-            }
-
-            @Override public void postClose(SolrCore core) {
-                closer.close();
-            }
-        });
+        core.addCloseHook(new MyCloseHook(closer));
 
         mirroringHandler = new KafkaRequestMirroringHandler(sink);
     }
 
-    private Integer getIntegerPropValue(String name, Properties props) {
+    private static Integer getIntegerPropValue(String name, Properties props) {
         String value = props.getProperty(name);
         if (value == null) {
             return null;
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index a2928ca..cf25ebe 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -107,7 +107,7 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    Map<String, String> properties = new HashMap<>();
+    Map<String, Object> properties = new HashMap<>();
     properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index b15cc25..8cbdae2 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -109,7 +109,7 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    Map<String, String> properties = new HashMap<>();
+    Map<String,Object> properties = new HashMap<>();
     properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 4b45506..1151b60 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -103,7 +103,7 @@ import static org.mockito.Mockito.spy;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    Map<String, String> properties = new HashMap<>();
+    Map<String, Object> properties = new HashMap<>();
     properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index 3ee449f..e25ac83 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -91,11 +91,12 @@ import java.util.*;
     log.info("bootstrapServers={}", bootstrapServers);
 
 
-    Map<String, String> properties = new HashMap<>();
+    Map<String, Object> properties = new HashMap<>();
     properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
     properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+    properties.put(KafkaCrossDcConf.MAX_POLL_RECORDS, 3);
     consumer.start(properties);
 
   }
diff --git 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index b52a9c5..d496e7c 100644
--- 
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++ 
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -111,7 +111,7 @@ import java.util.Properties;
     String bootstrapServers = kafkaCluster.bootstrapServers();
     log.info("bootstrapServers={}", bootstrapServers);
 
-    Map<String, String> properties = new HashMap<>();
+    Map<String, Object> properties = new HashMap<>();
     properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
     properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, 
solrCluster2.getZkServer().getZkAddress());
     properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);

Reply via email to