This is an automated email from the ASF dual-hosted git repository.
markrmiller pushed a commit to branch crossdc-wip
in repository https://gitbox.apache.org/repos/asf/solr-sandbox.git
The following commit(s) were added to refs/heads/crossdc-wip by this push:
new 6c907ca SSL config passed on to Kafka and a variety of general
cleanup. (#37)
6c907ca is described below
commit 6c907cafef7f8d69494820be789f22e3f8ef92df
Author: Mark Robert Miller <[email protected]>
AuthorDate: Wed Sep 14 05:06:17 2022 -0500
SSL config passed on to Kafka and a variety of general cleanup. (#37)
---
.../apache/solr/crossdc/common/ConfigProperty.java | 15 +++-
.../solr/crossdc/common/KafkaCrossDcConf.java | 58 +++++++++------
.../solr/crossdc/common/KafkaMirroringSink.java | 4 +-
.../solr/crossdc/common/MirroredSolrRequest.java | 25 -------
.../common/MirroredSolrRequestSerializer.java | 32 ++++----
.../org/apache/solr/crossdc/consumer/Consumer.java | 49 ++++++------
.../crossdc/consumer/KafkaCrossDcConsumer.java | 28 +++----
.../update/processor/MirroringUpdateProcessor.java | 19 +++--
.../MirroringUpdateRequestProcessorFactory.java | 86 +++++++++++-----------
.../apache/solr/crossdc/DeleteByQueryToIdTest.java | 2 +-
.../solr/crossdc/RetryQueueIntegrationTest.java | 2 +-
.../solr/crossdc/SolrAndKafkaIntegrationTest.java | 2 +-
.../solr/crossdc/SolrAndKafkaReindexTest.java | 3 +-
.../solr/crossdc/ZkConfigIntegrationTest.java | 2 +-
14 files changed, 160 insertions(+), 167 deletions(-)
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
index 256b6e0..8c1af02 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/ConfigProperty.java
@@ -1,6 +1,7 @@
package org.apache.solr.crossdc.common;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
public class ConfigProperty {
@@ -47,9 +48,12 @@ public class ConfigProperty {
}
public Integer getValueAsInt(Map properties) {
- String value = (String) properties.get(key);
+ Object value = (Object) properties.get(key);
if (value != null) {
- return Integer.parseInt(value);
+ if (value instanceof Integer) {
+ return (Integer) value;
+ }
+ return Integer.parseInt(value.toString());
}
if (defaultValue == null) {
return null;
@@ -58,9 +62,12 @@ public class ConfigProperty {
}
public Boolean getValueAsBoolean(Map properties) {
- String value = (String) properties.get(key);
+ Object value = (Object) properties.get(key);
if (value != null) {
- return Boolean.parseBoolean(value);
+ if (value instanceof Boolean) {
+ return (Boolean) value;
+ }
+ return Boolean.parseBoolean(value.toString());
}
return Boolean.parseBoolean(defaultValue);
}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
index e7e510f..9a14a1b 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java
@@ -16,10 +16,8 @@
*/
package org.apache.solr.crossdc.common;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
@@ -90,10 +88,10 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String ZK_CONNECT_STRING = "zkConnectString";
-
-
public static final List<ConfigProperty> CONFIG_PROPERTIES;
- private static final HashMap<String, ConfigProperty> CONFIG_PROPERTIES_MAP;
+ private static final Map<String, ConfigProperty> CONFIG_PROPERTIES_MAP;
+
+ public static final List<ConfigProperty> SECURITY_CONFIG_PROPERTIES;
public static final String PORT = "port";
@@ -102,7 +100,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
static {
- CONFIG_PROPERTIES =
+ List<ConfigProperty> configProperties = new ArrayList<>(
List.of(new ConfigProperty(TOPIC_NAME), new
ConfigProperty(BOOTSTRAP_SERVERS),
new ConfigProperty(BATCH_SIZE_BYTES, DEFAULT_BATCH_SIZE_BYTES),
new ConfigProperty(BUFFER_MEMORY_BYTES,
DEFAULT_BUFFER_MEMORY_BYTES),
@@ -124,9 +122,11 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(MAX_PARTITION_FETCH_BYTES,
DEFAULT_MAX_PARTITION_FETCH_BYTES),
new ConfigProperty(MAX_POLL_RECORDS, DEFAULT_MAX_POLL_RECORDS),
new ConfigProperty(PORT, DEFAULT_PORT),
- new ConfigProperty(GROUP_ID, DEFAULT_GROUP_ID),
-
- // SSL
+ new ConfigProperty(GROUP_ID, DEFAULT_GROUP_ID)));
+
+
+ SECURITY_CONFIG_PROPERTIES =
+ List.of(
new ConfigProperty(SslConfigs.SSL_PROTOCOL_CONFIG),
new ConfigProperty(SslConfigs.SSL_PROVIDER_CONFIG),
new ConfigProperty(SslConfigs.SSL_CIPHER_SUITES_CONFIG),
@@ -149,22 +149,25 @@ public class KafkaCrossDcConf extends CrossDcConf {
new ConfigProperty(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG),
- // Admin Client Security
- new ConfigProperty(AdminClientConfig.SECURITY_PROTOCOL_CONFIG),
+ // From Common and Admin Client Security
+ new ConfigProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG),
new ConfigProperty(AdminClientConfig.SECURITY_PROVIDERS_CONFIG)
- );
-
+ );
+ configProperties.addAll(SECURITY_CONFIG_PROPERTIES);
+ CONFIG_PROPERTIES = Collections.unmodifiableList(configProperties);
- CONFIG_PROPERTIES_MAP = new HashMap<String,
ConfigProperty>(CONFIG_PROPERTIES.size());
+ Map<String, ConfigProperty> configPropertiesMap =
+ new HashMap<String, ConfigProperty>(CONFIG_PROPERTIES.size());
for (ConfigProperty prop : CONFIG_PROPERTIES) {
- CONFIG_PROPERTIES_MAP.put(prop.getKey(), prop);
+ configPropertiesMap.put(prop.getKey(), prop);
}
+ CONFIG_PROPERTIES_MAP = configPropertiesMap;
}
- private final Map<String, String> properties;
+ private final Map<String, Object> properties;
- public KafkaCrossDcConf(Map<String, String> properties) {
+ public KafkaCrossDcConf(Map<String, Object> properties) {
List<String> nullValueKeys = new ArrayList<String>();
properties.forEach((k, v) -> {
if (v == null) {
@@ -175,6 +178,15 @@ public class KafkaCrossDcConf extends CrossDcConf {
this.properties = properties;
}
+ public static void addSecurityProps(KafkaCrossDcConf conf, Properties
kafkaConsumerProps) {
+ for (ConfigProperty property : SECURITY_CONFIG_PROPERTIES) {
+ String val = conf.get(property.getKey());
+ if (val != null) {
+ kafkaConsumerProps.put(property.getKey(), val);
+ }
+ }
+ }
+
public String get(String property) {
return CONFIG_PROPERTIES_MAP.get(property).getValue(properties);
}
@@ -202,15 +214,17 @@ public class KafkaCrossDcConf extends CrossDcConf {
additional.remove(configProperty.getKey());
}
Map<String, Object> integerProperties = new HashMap<>();
- additional.forEach((k, v) -> {
+ additional.forEach((key, v) -> {
try {
int intVal = Integer.parseInt((String) v);
- integerProperties.put(k.toString(), intVal);
+ integerProperties.put(key.toString(), intVal);
} catch (NumberFormatException ignored) {
}
});
- additional.putAll(integerProperties);
+ integerProperties.forEach((key, v) -> {
+ additional.setProperty(key, (String) v);
+ });
return additional;
}
@@ -222,7 +236,7 @@ public class KafkaCrossDcConf extends CrossDcConf {
}
sb.setLength(sb.length() - 1);
- return "KafkaCrossDcConf{" + sb.toString() + "}";
+ return "KafkaCrossDcConf{" + sb + "}";
}
}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
index 5afef79..9cfa67f 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java
@@ -93,7 +93,7 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
log.info("Creating Kafka producer! Configurations {} ",
conf.toString());
- kafkaProducerProps.put("bootstrap.servers",
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
@@ -111,6 +111,8 @@ public class KafkaMirroringSink implements
RequestMirroringSink, Closeable {
kafkaProducerProps.put("key.serializer",
StringSerializer.class.getName());
kafkaProducerProps.put("value.serializer",
MirroredSolrRequestSerializer.class.getName());
+ KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps);
+
kafkaProducerProps.putAll(conf.getAdditionalProperties());
if (log.isDebugEnabled()) {
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
index 4c96116..74dc785 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequest.java
@@ -17,10 +17,8 @@
package org.apache.solr.crossdc.common;
import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.common.params.SolrParams;
import java.util.*;
-import java.util.concurrent.TimeUnit;
/**
* Class to encapsulate a mirrored Solr request.
@@ -55,29 +53,6 @@ public class MirroredSolrRequest {
solrRequest = null;
}
- public static MirroredSolrRequest
mirroredAdminCollectionRequest(SolrParams params) {
- Map<String, List<String>> createParams = new HashMap();
- // don't mirror back
- createParams.put(CrossDcConstants.SHOULD_MIRROR,
Collections.singletonList("false"));
-
- final Iterator<String> paramNamesIterator =
params.getParameterNamesIterator();
- while (paramNamesIterator.hasNext()) {
- final String key = paramNamesIterator.next();
- if (key.equals("createNodeSet") || key.equals("node")) {
- // don't forward as nodeset most likely makes no sense here.
- // should we log when we skip this parameter that was part of
the original request ?
- continue;
- }
- final String[] values = params.getParams(key);
- if (values != null) {
- createParams.put(key, Arrays.asList(values));
- }
- }
-
- return new MirroredSolrRequest(1,
- TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()));
- }
-
public int getAttempt() {
return attempt;
}
diff --git
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
index 856f1c9..3f0684d 100644
---
a/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
+++
b/crossdc-commons/src/main/java/org/apache/solr/crossdc/common/MirroredSolrRequestSerializer.java
@@ -18,12 +18,9 @@ package org.apache.solr.crossdc.common;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.solr.client.solrj.SolrRequest;
-import org.apache.solr.client.solrj.request.JavaBinUpdateRequestCodec;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.params.MapSolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
-import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.util.JavaBinCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,27 +113,30 @@ public class MirroredSolrRequestSerializer implements
Serializer<MirroredSolrReq
UpdateRequest solrRequest = (UpdateRequest) request.getSolrRequest();
if (log.isTraceEnabled()) {
- log.trace("serialize request={} docs={} deletebyid={}",
solrRequest, solrRequest.getDocuments(), solrRequest.getDeleteById());
+ log.trace("serialize request={} docs={} deletebyid={}",
solrRequest,
+ solrRequest.getDocuments(), solrRequest.getDeleteById());
}
- JavaBinCodec codec = new JavaBinCodec(null);
+ try (JavaBinCodec codec = new JavaBinCodec(null)) {
- ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
- Map map = new HashMap();
- map.put("params", solrRequest.getParams());
- map.put("docs", solrRequest.getDocuments());
+ ExposedByteArrayOutputStream baos = new
ExposedByteArrayOutputStream();
+ Map map = new HashMap(4);
+ map.put("params", solrRequest.getParams());
+ map.put("docs", solrRequest.getDocuments());
- // TODO
- //map.put("deletes", solrRequest.getDeleteByIdMap());
- map.put("deletes", solrRequest.getDeleteById());
- map.put("deleteQuery", solrRequest.getDeleteQuery());
+ // TODO
+ //map.put("deletes", solrRequest.getDeleteByIdMap());
+ map.put("deletes", solrRequest.getDeleteById());
+ map.put("deleteQuery", solrRequest.getDeleteQuery());
- try {
codec.marshal(map, baos);
+
+ return baos.byteArray();
+
} catch (IOException e) {
throw new RuntimeException(e);
}
- return baos.byteArray();
+
}
/**
@@ -145,7 +145,7 @@ public class MirroredSolrRequestSerializer implements
Serializer<MirroredSolrReq
* This method must be idempotent as it may be called multiple times.
*/
@Override
- public void close() {
+ public final void close() {
Serializer.super.close();
}
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
index f77a5bb..b3cb2b3 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/Consumer.java
@@ -22,9 +22,7 @@ import org.apache.solr.crossdc.common.ConfigProperty;
import org.apache.solr.crossdc.common.CrossDcConf;
import org.apache.solr.crossdc.common.KafkaCrossDcConf;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
-import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,22 +39,14 @@ import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.*;
// Cross-DC Consumer main class
public class Consumer {
-
-
-
- private static boolean enabled = true;
+ private static final boolean enabled = true;
private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- /**
- * ExecutorService to manage the cross-dc consumer threads.
- */
- private ExecutorService consumerThreadExecutor;
-
private Server server;
- CrossDcConsumer crossDcConsumer;
+ private CrossDcConsumer crossDcConsumer;
- public void start(Map<String, String> properties) {
+ public void start(Map<String, Object> properties) {
//server = new Server();
@@ -70,7 +60,10 @@ public class Consumer {
log.info("Starting CrossDC Consumer {}", conf);
- consumerThreadExecutor = Executors.newSingleThreadExecutor();
+ /**
+ * ExecutorService to manage the cross-dc consumer threads.
+ */
+ ExecutorService consumerThreadExecutor =
Executors.newSingleThreadExecutor();
consumerThreadExecutor.submit(crossDcConsumer);
// Register shutdown hook
@@ -84,13 +77,13 @@ public class Consumer {
public static void main(String[] args) {
- Map<String,String> properties = new HashMap<>();
+ Map<String,Object> properties = new HashMap<>();
for (ConfigProperty configKey : KafkaCrossDcConf.CONFIG_PROPERTIES) {
properties.put(configKey.getKey(),
System.getProperty(configKey.getKey()));
}
- String zkConnectString =
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
+ String zkConnectString = (String)
properties.get(KafkaCrossDcConf.ZK_CONNECT_STRING);
if (zkConnectString == null || zkConnectString.isBlank()) {
throw new IllegalArgumentException("zkConnectString not specified
for Consumer");
}
@@ -98,20 +91,24 @@ public class Consumer {
try (SolrZkClient client = new SolrZkClient(zkConnectString, 15000)) {
try {
- if
(client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
- byte[] data =
client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
+ if
(client.exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+ CrossDcConf.CROSSDC_PROPERTIES), true)) {
+ byte[] data =
client.getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+ CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
Properties zkProps = new Properties();
zkProps.load(new ByteArrayInputStream(data));
- Properties zkPropsUnproccessed = new Properties(zkProps);
+
+ Map<Object, Object> zkPropsUnproccessed = new
HashMap<>(zkProps);
for (ConfigProperty configKey :
KafkaCrossDcConf.CONFIG_PROPERTIES) {
- if (properties.get(configKey.getKey()) == null ||
properties.get(configKey.getKey()).isBlank()) {
- properties.put(configKey.getKey(), (String)
zkProps.get(configKey.getKey()));
+ if (properties.get(configKey.getKey()) == null ||
((String)properties.get(configKey.getKey())).isBlank()) {
+ properties.put(configKey.getKey(), (String)
zkProps.getProperty(
+ configKey.getKey()));
zkPropsUnproccessed.remove(configKey.getKey());
}
}
- zkPropsUnproccessed.forEach((k, v) -> {
- properties.put((String) k, (String) v);
+ zkPropsUnproccessed.forEach((key, val) -> {
+ properties.put((String) key, (String) val);
});
}
} catch (InterruptedException e) {
@@ -122,12 +119,12 @@ public class Consumer {
}
}
- String bootstrapServers =
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
+ String bootstrapServers = (String)
properties.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS);
if (bootstrapServers == null || bootstrapServers.isBlank()) {
throw new IllegalArgumentException("bootstrapServers not specified
for Consumer");
}
- String topicName = properties.get(TOPIC_NAME);
+ String topicName = (String) properties.get(TOPIC_NAME);
if (topicName == null || topicName.isBlank()) {
throw new IllegalArgumentException("topicName not specified for
Consumer");
}
@@ -136,7 +133,7 @@ public class Consumer {
consumer.start(properties);
}
- public void shutdown() {
+ public final void shutdown() {
if (crossDcConsumer != null) {
crossDcConsumer.shutdown();
}
diff --git
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
index 9f95a90..451253b 100644
---
a/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
+++
b/crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java
@@ -8,7 +8,6 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.crossdc.common.*;
import org.apache.solr.crossdc.messageprocessor.SolrMessageProcessor;
@@ -36,9 +35,9 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final static int KAFKA_CONSUMER_POLL_TIMEOUT_MS = 5000;
private final String topicName;
- SolrMessageProcessor messageProcessor;
+ private final SolrMessageProcessor messageProcessor;
- CloudSolrClient solrClient;
+ private final CloudSolrClient solrClient;
/**
* @param conf The Kafka consumer configuration
@@ -64,17 +63,15 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+ KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
+
kafkaConsumerProps.putAll(conf.getAdditionalProperties());
solrClient =
new
CloudSolrClient.Builder(Collections.singletonList(conf.get(KafkaCrossDcConf.ZK_CONNECT_STRING)),
Optional.empty()).build();
- messageProcessor = new SolrMessageProcessor(solrClient, new
ResubmitBackoffPolicy() {
- @Override public long getBackoffTimeMs(MirroredSolrRequest
resubmitRequest) {
- return 0;
- }
- });
+ messageProcessor = new SolrMessageProcessor(solrClient, resubmitRequest ->
0L);
log.info("Creating Kafka consumer with configuration {}",
kafkaConsumerProps);
consumer = createConsumer(kafkaConsumerProps);
@@ -86,10 +83,9 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
}
- private KafkaConsumer<String, MirroredSolrRequest> createConsumer(Properties
properties) {
- KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, new
StringDeserializer(),
+ public static KafkaConsumer<String, MirroredSolrRequest>
createConsumer(Properties properties) {
+ return new KafkaConsumer<>(properties, new StringDeserializer(),
new MirroredSolrRequestSerializer());
- return kafkaConsumer;
}
/**
@@ -145,7 +141,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
log.trace("Fetched record from topic={} partition={} key={}
value={}", record.topic(),
record.partition(), record.key(), record.value());
}
- IQueueHandler.Result result =
messageProcessor.handleItem(record.value());
+ IQueueHandler.Result<MirroredSolrRequest> result =
messageProcessor.handleItem(record.value());
switch (result.status()) {
case FAILED_RESUBMIT:
// currently, we use a strategy taken from an earlier working
implementation
@@ -200,8 +196,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
} catch (Exception e) {
// If there is any exception returned by handleItem, then reset the
offset.
- if (e instanceof ClassCastException || e instanceof
ClassNotFoundException
- || e instanceof SerializationException) { // TODO: optional
+ if (e instanceof ClassCastException || e instanceof
SerializationException) { // TODO: optional
log.error("Non retryable error", e);
break;
}
@@ -215,8 +210,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
return false;
} catch (Exception e) {
- if (e instanceof ClassCastException || e instanceof
ClassNotFoundException
- || e instanceof SerializationException) { // TODO: optional
+ if (e instanceof ClassCastException || e instanceof
SerializationException) { // TODO: optional
log.error("Non retryable error", e);
return false;
}
@@ -262,7 +256,7 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
/**
* Shutdown the Kafka consumer by calling wakeup.
*/
- public void shutdown() {
+ public final void shutdown() {
log.info("Shutdown called on KafkaCrossDcConsumer");
try {
solrClient.close();
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
index 02282b6..758ca8e 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateProcessor.java
@@ -14,8 +14,6 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.*;
import org.apache.solr.common.params.*;
import org.apache.solr.request.SolrQueryRequest;
-import org.apache.solr.schema.IndexSchema;
-import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
@@ -126,8 +124,9 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
String nextCursorMark = rsp.getNextCursorMark();
if (log.isDebugEnabled()) {
- log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark,
nextCursorMark, cnt++,
+ log.debug("resp: cm={}, ncm={}, cnt={}, results={} ", cursorMark,
nextCursorMark, cnt,
rsp.getResults());
+ cnt++;
}
processDBQResults(client, collection, uniqueField, rsp);
@@ -147,15 +146,14 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
if (doMirroring) {
boolean isLeader = false;
if (cmd.isDeleteById()) {
- DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
// deleteById requests runs once per leader, so we just submit the
request from the leader shard
- isLeader = isLeader(cmd.getReq(), dcmd.getId(), null !=
cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
+ isLeader = isLeader(cmd.getReq(), ((DeleteUpdateCommand)cmd).getId(),
null != cmd.getRoute() ? cmd.getRoute() : cmd.getReq().getParams().get(
ShardParams._ROUTE_), null);
if (isLeader) {
createAndOrGetMirrorRequest().deleteById(cmd.getId()); // strip
versions from deletes
}
if (log.isDebugEnabled())
- log.debug("processDelete doMirroring={} isLeader={} cmd={}",
doMirroring, isLeader, cmd);
+ log.debug("processDelete doMirroring={} isLeader={} cmd={}", true,
isLeader, cmd);
} else {
// DBQs are sent to each shard leader, so we mirror from the original
node to only mirror once
// In general there's no way to guarantee that these run identically
on the mirror since there are no
@@ -166,16 +164,17 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
createAndOrGetMirrorRequest().deleteByQuery(cmd.query);
}
if (log.isDebugEnabled())
- log.debug("processDelete doMirroring={} cmd={}", doMirroring, cmd);
+ log.debug("processDelete doMirroring={} cmd={}", true, cmd);
}
}
}
- private void processDBQResults(SolrClient client, String collection, String
uniqueField, QueryResponse rsp)
+ private static void processDBQResults(SolrClient client, String collection,
String uniqueField,
+ QueryResponse rsp)
throws SolrServerException, IOException {
SolrDocumentList results = rsp.getResults();
- List<String> ids = new ArrayList<>();
+ List<String> ids = new ArrayList<>(results.size());
results.forEach(entries -> {
String id = entries.getFirstValue(uniqueField).toString();
ids.add(id);
@@ -225,7 +224,7 @@ public class MirroringUpdateProcessor extends
UpdateRequestProcessor {
if (next != null) next.processCommit(cmd);
}
- @Override public void finish() throws IOException {
+ @Override public final void finish() throws IOException {
super.finish();
if (doMirroring && mirrorRequest != null) {
diff --git
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
index 57a18ad..160cb46 100644
---
a/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
+++
b/crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java
@@ -30,8 +30,6 @@ import org.apache.solr.crossdc.common.KafkaMirroringSink;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.plugin.SolrCoreAware;
-import org.apache.zookeeper.KeeperException;
-import org.checkerframework.checker.units.qual.C;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,25 +64,15 @@ public class MirroringUpdateRequestProcessorFactory extends
UpdateRequestProcess
new NoOpUpdateRequestProcessor();
// Flag for mirroring requests
- public static String SERVER_SHOULD_MIRROR = "shouldMirror";
+ public static final String SERVER_SHOULD_MIRROR = "shouldMirror";
/** This is instantiated in inform(SolrCore) and then shared by all
processor instances - visible for testing */
- volatile KafkaRequestMirroringHandler mirroringHandler;
-// private String topicName;
-// private String bootstrapServers;
-//
-// private Integer batchSizeBytes;
-// private Integer bufferMemoryBytes;
-// private Integer lingerMs;
-// private Integer requestTimeout;
-//
-// private Integer maxRequestSize;
-//
-// private String enableDataCompression;
+ private volatile KafkaRequestMirroringHandler mirroringHandler;
+
private boolean enabled = true;
- private final Map<String,String> properties = new HashMap<>();
+ private final Map<String,Object> properties = new HashMap<>();
@Override
public void init(final NamedList args) {
@@ -100,14 +88,30 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
}
}
- private class Closer {
+ private static class MyCloseHook extends CloseHook {
+ private final Closer closer;
+
+ public MyCloseHook(Closer closer) {
+ this.closer = closer;
+ }
+
+ @Override public void preClose(SolrCore core) {
+
+ }
+
+ @Override public void postClose(SolrCore core) {
+ closer.close();
+ }
+ }
+
+ private static class Closer {
private final KafkaMirroringSink sink;
public Closer(KafkaMirroringSink sink) {
this.sink = sink;
}
- public void close() {
+ public final void close() {
try {
this.sink.close();
} catch (IOException e) {
@@ -127,12 +131,15 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
Properties zkProps = null;
try {
if (core.getCoreContainer().getZkController()
-
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), true)) {
- byte[] data =
core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
KafkaCrossDcConf.CROSSDC_PROPERTIES), null, null, true);
+
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+ CrossDcConf.CROSSDC_PROPERTIES), true)) {
+ byte[] data =
core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
+ CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
if (data == null) {
- log.error(KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in
Zookeeper has no data");
- throw new
SolrException(SolrException.ErrorCode.SERVER_ERROR,
KafkaCrossDcConf.CROSSDC_PROPERTIES + " file in Zookeeper has no data");
+ log.error(CrossDcConf.CROSSDC_PROPERTIES + " file in
Zookeeper has no data");
+ throw new
SolrException(SolrException.ErrorCode.SERVER_ERROR,
CrossDcConf.CROSSDC_PROPERTIES
+ + " file in Zookeeper has no data");
}
zkProps = new Properties();
@@ -140,13 +147,14 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
Properties zkPropsUnproccessed = new Properties(zkProps);
for (ConfigProperty configKey :
KafkaCrossDcConf.CONFIG_PROPERTIES) {
- if (properties.get(configKey.getKey()) == null ||
properties.get(configKey.getKey()).isBlank()) {
- properties.put(configKey.getKey(), (String)
zkProps.get(configKey.getKey()));
+ if (properties.get(configKey.getKey()) == null ||
((String)properties.get(configKey.getKey())).isBlank()) {
+ properties.put(configKey.getKey(), (String)
zkProps.getProperty(
+ configKey.getKey()));
zkPropsUnproccessed.remove(configKey.getKey());
}
}
- zkPropsUnproccessed.forEach((k, v) -> {
- properties.put((String) k, (String) v);
+ zkPropsUnproccessed.forEach((key, val) -> {
+ properties.put((String) key, (String) val);
});
}
} catch (InterruptedException e) {
@@ -158,17 +166,21 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Exception looking for CrossDC configuration in Zookeeper", e);
}
- if (properties.get(BOOTSTRAP_SERVERS) == null ||
properties.get(BOOTSTRAP_SERVERS).isBlank()) {
- log.error("boostrapServers not specified for producer in CrossDC
configuration props=" + properties + " zkProps=" + zkProps);
+ if (properties.get(BOOTSTRAP_SERVERS) == null ||
((String)properties.get(BOOTSTRAP_SERVERS)).isBlank()) {
+ log.error(
+ "boostrapServers not specified for producer in CrossDC
configuration props={} zkProps={}",
+ properties, zkProps);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"boostrapServers not specified for producer");
}
- if (properties.get(TOPIC_NAME) == null ||
properties.get(TOPIC_NAME).isBlank()) {
- log.error("topicName not specified for producer in CrossDC
configuration props=" + properties + " zkProps=" + zkProps);
+ if (properties.get(TOPIC_NAME) == null ||
((String)properties.get(TOPIC_NAME)).isBlank()) {
+ log.error(
+ "topicName not specified for producer in CrossDC configuration
props={} zkProps={}",
+ properties, zkProps);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"topicName not specified for producer");
}
- log.info("bootstrapServers={} topicName={}",
properties.get(BOOTSTRAP_SERVERS) , properties.get(TOPIC_NAME));
+ log.info("bootstrapServers={} topicName={}",
properties.get(BOOTSTRAP_SERVERS), properties.get(TOPIC_NAME));
// load the request mirroring sink class and instantiate.
// mirroringHandler =
core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(),
KafkaRequestMirroringHandler.class);
@@ -177,20 +189,12 @@ public class MirroringUpdateRequestProcessorFactory
extends UpdateRequestProcess
KafkaMirroringSink sink = new KafkaMirroringSink(conf);
Closer closer = new Closer(sink);
- core.addCloseHook(new CloseHook() {
- @Override public void preClose(SolrCore core) {
-
- }
-
- @Override public void postClose(SolrCore core) {
- closer.close();
- }
- });
+ core.addCloseHook(new MyCloseHook(closer));
mirroringHandler = new KafkaRequestMirroringHandler(sink);
}
- private Integer getIntegerPropValue(String name, Properties props) {
+ private static Integer getIntegerPropValue(String name, Properties props) {
String value = props.getProperty(name);
if (value == null) {
return null;
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
index a2928ca..cf25ebe 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/DeleteByQueryToIdTest.java
@@ -107,7 +107,7 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- Map<String, String> properties = new HashMap<>();
+ Map<String, Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
index b15cc25..8cbdae2 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/RetryQueueIntegrationTest.java
@@ -109,7 +109,7 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- Map<String, String> properties = new HashMap<>();
+ Map<String,Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
index 4b45506..1151b60 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaIntegrationTest.java
@@ -103,7 +103,7 @@ import static org.mockito.Mockito.spy;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- Map<String, String> properties = new HashMap<>();
+ Map<String, Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
index 3ee449f..e25ac83 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/SolrAndKafkaReindexTest.java
@@ -91,11 +91,12 @@ import java.util.*;
log.info("bootstrapServers={}", bootstrapServers);
- Map<String, String> properties = new HashMap<>();
+ Map<String, Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
+ properties.put(KafkaCrossDcConf.MAX_POLL_RECORDS, 3);
consumer.start(properties);
}
diff --git
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
index b52a9c5..d496e7c 100644
---
a/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
+++
b/crossdc-producer/src/test/java/org/apache/solr/crossdc/ZkConfigIntegrationTest.java
@@ -111,7 +111,7 @@ import java.util.Properties;
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
- Map<String, String> properties = new HashMap<>();
+ Map<String, Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING,
solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);