HoustonPutman commented on code in PR #2541:
URL: https://github.com/apache/solr/pull/2541#discussion_r1657812390
##########
solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SimpleSolrIntegrationTest.java:
##########
@@ -46,21 +45,21 @@ public static void ensureWorkingMockito() {
@BeforeClass
public static void beforeSimpleSolrIntegrationTest() throws Exception {
-
+ System.setProperty("solr.crossdc.bootstrapServers", "doesnotmatter:9092");
+ System.setProperty("solr.crossdc.topicName", "doesnotmatter");
cluster1 =
configureCluster(2)
- .addConfig("conf", getFile("configs/cloud-minimal/conf").toPath())
+ // .addConfig("conf",
getFile("configs/cloud-minimal/conf").toPath())
Review Comment:
why is this commented out?
##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java:
##########
@@ -29,345 +33,369 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.CrossDcConstants;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.SolrExceptionUtil;
-import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
/**
- * Message processor implements all the logic to process a MirroredSolrRequest.
- * It handles:
- * 1. Sending the update request to Solr
- * 2. Discarding or retrying failed requests
- * 3. Flagging requests for resubmission by the underlying consumer
implementation.
+ * Message processor implements all the logic to process a
MirroredSolrRequest. It handles: 1.
Review Comment:
We can probably fix this by adding line breaks and stuff
##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java:
##########
@@ -248,8 +288,10 @@ public void processDelete(final DeleteUpdateCommand cmd)
throws IOException {
if (log.isDebugEnabled())
log.debug("processDelete doMirroring={} isLeader={} cmd={}", true,
isLeader, cmd);
} else {
- // DBQs are sent to each shard leader, so we mirror from the original
node to only mirror once
- // In general there's no way to guarantee that these run identically
on the mirror since there are no
+ // DBQs are sent to each shard leader, so we mirror from the original
node to only mirror
+ // once
+ // In general there's no way to guarantee that these run identically
on the mirror since
+ // there are no
Review Comment:
Make this a little prettier
##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java:
##########
@@ -28,178 +37,200 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.solr.crossdc.common.KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS;
-
public class KafkaMirroringSink implements RequestMirroringSink, Closeable {
- private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final KafkaCrossDcConf conf;
- private final Producer<String, MirroredSolrRequest> producer;
- private final KafkaConsumer<String,MirroredSolrRequest> consumer;
- private final String mainTopic;
- private final String dlqTopic;
-
- public KafkaMirroringSink(final KafkaCrossDcConf conf) {
- // Create Kafka Mirroring Sink
- this.conf = conf;
- this.producer = initProducer();
- this.consumer = initConsumer();
- this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0];
- this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME);
-
- checkTopicsAvailability();
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final KafkaCrossDcConf conf;
+ private final Producer<String, MirroredSolrRequest<?>> producer;
+ private final KafkaConsumer<String, MirroredSolrRequest<?>> consumer;
+ private final String mainTopic;
+ private final String dlqTopic;
+
+ public KafkaMirroringSink(final KafkaCrossDcConf conf) {
+ // Create Kafka Mirroring Sink
+ this.conf = conf;
+ this.producer = initProducer();
+ this.consumer = initConsumer();
+ this.mainTopic = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",")[0];
+ this.dlqTopic = conf.get(KafkaCrossDcConf.DLQ_TOPIC_NAME);
+
+ checkTopicsAvailability();
+ }
+
+ @Override
+ public void submit(MirroredSolrRequest<?> request) throws MirroringException
{
+ this.submitRequest(request, mainTopic);
+ }
+
+ @Override
+ public void submitToDlq(MirroredSolrRequest<?> request) throws
MirroringException {
+ if (dlqTopic != null) {
+ this.submitRequest(request, dlqTopic);
+ } else {
+ if (log.isInfoEnabled()) {
+ log.info("- no DLQ, dropping failed {}", request);
+ }
}
+ }
- @Override
- public void submit(MirroredSolrRequest request) throws MirroringException {
- this.submitRequest(request, mainTopic);
- }
+ private void checkTopicsAvailability() {
+ final Map<String, List<PartitionInfo>> topics = this.consumer.listTopics();
- @Override
- public void submitToDlq(MirroredSolrRequest request) throws
MirroringException {
- if (dlqTopic != null) {
- this.submitRequest(request, dlqTopic);
- } else {
- if (log.isInfoEnabled()) {
- log.info("- no DLQ, dropping failed {}", request);
- }
- }
+ if (mainTopic != null && !topics.containsKey(mainTopic)) {
+ throw new RuntimeException("Main topic " + mainTopic + " is not
available");
}
-
- private void checkTopicsAvailability() {
- final Map<String, List<PartitionInfo>> topics =
this.consumer.listTopics();
-
- if (mainTopic != null && !topics.containsKey(mainTopic)) {
- throw new RuntimeException("Main topic " + mainTopic + " is not
available");
- }
- if (dlqTopic != null && !topics.containsKey(dlqTopic)) {
- throw new RuntimeException("DLQ topic " + dlqTopic + " is not
available");
- }
+ if (dlqTopic != null && !topics.containsKey(dlqTopic)) {
+ throw new RuntimeException("DLQ topic " + dlqTopic + " is not
available");
}
+ }
- private void submitRequest(MirroredSolrRequest request, String topicName)
throws MirroringException {
- if (log.isDebugEnabled()) {
- log.debug("About to submit a MirroredSolrRequest");
- }
-
- final long enqueueStartNanos = System.nanoTime();
-
- // Create Producer record
- try {
+ private void submitRequest(MirroredSolrRequest<?> request, String topicName)
+ throws MirroringException {
+ if (log.isDebugEnabled()) {
+ log.debug("About to submit a MirroredSolrRequest");
+ }
- producer.send(new ProducerRecord<>(topicName, request), (metadata,
exception) -> {
- if (exception != null) {
- log.error("Failed adding update to CrossDC queue!
request=" + request.getSolrRequest(), exception);
- }
- });
+ final long enqueueStartNanos = System.nanoTime();
- long lastSuccessfulEnqueueNanos = System.nanoTime();
- // Record time since last successful enqueue as 0
- long elapsedTimeMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - enqueueStartNanos);
- // Update elapsed time
+ // Create Producer record
+ try {
- if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
- slowSubmitAction(request, elapsedTimeMillis);
+ producer.send(
+ new ProducerRecord<>(topicName, request),
+ (metadata, exception) -> {
+ if (exception != null) {
+ log.error(
+ "Failed adding update to CrossDC queue! request=" +
request.getSolrRequest(),
+ exception);
}
- } catch (Exception e) {
- // We are intentionally catching all exceptions, the expected
exception form this function is {@link MirroringException}
- String message = "Unable to enqueue request " + request + ",
configured retries is" + conf.getInt(KafkaCrossDcConf.NUM_RETRIES) +
- " and configured max delivery timeout in ms is " +
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
- log.error(message, e);
- throw new MirroringException(message, e);
- }
+ });
+
+ long lastSuccessfulEnqueueNanos = System.nanoTime();
+ // Record time since last successful enqueue as 0
+ long elapsedTimeMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- enqueueStartNanos);
+ // Update elapsed time
+
+ if (elapsedTimeMillis > conf.getInt(SLOW_SUBMIT_THRESHOLD_MS)) {
+ slowSubmitAction(request, elapsedTimeMillis);
+ }
+ } catch (Exception e) {
+ // We are intentionally catching all exceptions, the expected exception
form this function is
+ // {@link MirroringException}
+ String message =
+ "Unable to enqueue request "
+ + request
+ + ", configured retries is"
+ + conf.getInt(KafkaCrossDcConf.NUM_RETRIES)
+ + " and configured max delivery timeout in ms is "
+ + conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS);
+ log.error(message, e);
+ throw new MirroringException(message, e);
}
-
- /**
- * Create and init the producer using {@link this#conf}
- * All producer configs are listed here
- * https://kafka.apache.org/documentation/#producerconfigs
- *
- * @return
- */
- private Producer<String, MirroredSolrRequest> initProducer() {
- // Initialize and return Kafka producer
- Properties kafkaProducerProps = new Properties();
-
- log.info("Starting CrossDC Producer {}", conf);
-
- kafkaProducerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
-
- kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
- String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
- if (retries != null) {
- kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.parseInt(retries));
- }
- kafkaProducerProps.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
- kafkaProducerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS));
- kafkaProducerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
- kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG,
conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
- kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
- kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.LINGER_MS));
- kafkaProducerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS)); // should be less than time
that causes consumer to be kicked out
- kafkaProducerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
conf.get(KafkaCrossDcConf.ENABLE_DATA_COMPRESSION));
-
- kafkaProducerProps.put("key.serializer",
StringSerializer.class.getName());
- kafkaProducerProps.put("value.serializer",
MirroredSolrRequestSerializer.class.getName());
-
- KafkaCrossDcConf.addSecurityProps(conf, kafkaProducerProps);
-
- kafkaProducerProps.putAll(conf.getAdditionalProperties());
-
- if (log.isDebugEnabled()) {
- log.debug("Kafka Producer props={}", kafkaProducerProps);
- }
-
- ClassLoader originalContextClassLoader =
Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(null);
- Producer<String, MirroredSolrRequest> producer;
- try {
- producer = new KafkaProducer<>(kafkaProducerProps);
- } finally {
-
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
- }
- return producer;
- }
-
- private KafkaConsumer<String, MirroredSolrRequest> initConsumer() {
- final Properties kafkaConsumerProperties = new Properties();
-
- kafkaConsumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
- kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
conf.get(KafkaCrossDcConf.GROUP_ID));
- kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
-
kafkaConsumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
- kafkaConsumerProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
- kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
- kafkaConsumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false);
- kafkaConsumerProperties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
- kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
- kafkaConsumerProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
-
kafkaConsumerProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
- kafkaConsumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
- kafkaConsumerProperties.putAll(conf.getAdditionalProperties());
-
- return new KafkaConsumer<>(kafkaConsumerProperties, new
StringDeserializer(), new MirroredSolrRequestSerializer());
+ }
+
+ /**
+ * Create and init the producer using {@link this#conf} All producer configs
are listed here
+ * https://kafka.apache.org/documentation/#producerconfigs
+ *
+ * @return
+ */
+ private Producer<String, MirroredSolrRequest<?>> initProducer() {
+ // Initialize and return Kafka producer
+ Properties kafkaProducerProps = new Properties();
+
+ log.info("Starting CrossDC Producer {}", conf);
+
+ kafkaProducerProps.put(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+
+ kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ String retries = conf.get(KafkaCrossDcConf.NUM_RETRIES);
+ if (retries != null) {
+ kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG,
Integer.parseInt(retries));
}
-
- private void slowSubmitAction(Object request, long elapsedTimeMillis) {
- log.warn("Enqueuing the request to Kafka took more than {} millis.
enqueueElapsedTime={}",
- conf.get(KafkaCrossDcConf.SLOW_SUBMIT_THRESHOLD_MS),
- elapsedTimeMillis);
+ kafkaProducerProps.put(
+ ProducerConfig.RETRY_BACKOFF_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.RETRY_BACKOFF_MS));
+ kafkaProducerProps.put(
+ ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
+ conf.getInt(KafkaCrossDcConf.DELIVERY_TIMEOUT_MS));
+ kafkaProducerProps.put(
+ ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
+ conf.getInt(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES));
+ kafkaProducerProps.put(
+ ProducerConfig.BATCH_SIZE_CONFIG,
conf.getInt(KafkaCrossDcConf.BATCH_SIZE_BYTES));
+ kafkaProducerProps.put(
+ ProducerConfig.BUFFER_MEMORY_CONFIG,
conf.getInt(KafkaCrossDcConf.BUFFER_MEMORY_BYTES));
+ kafkaProducerProps.put(
+ ProducerConfig.LINGER_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.LINGER_MS));
+ kafkaProducerProps.put(
+ ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
+ conf.getInt(
+ KafkaCrossDcConf
+ .REQUEST_TIMEOUT_MS)); // should be less than time that causes
consumer to be kicked
Review Comment:
Yeah this can probably be made prettier too
##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/update/processor/MirroringUpdateProcessor.java:
##########
@@ -263,58 +305,66 @@ public void processDelete(final DeleteUpdateCommand cmd)
throws IOException {
throw new SolrException(SERVER_ERROR, "mirror submit failed", e);
}
}
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("processDelete doMirroring={} cmd={}", true, cmd);
+ }
}
-
}
}
- private static void processDBQResults(SolrClient client, String collection,
String uniqueField,
- QueryResponse rsp)
+ private static void processDBQResults(
+ SolrClient client, String collection, String uniqueField, QueryResponse
rsp)
throws SolrServerException, IOException {
SolrDocumentList results = rsp.getResults();
List<String> ids = new ArrayList<>(results.size());
- results.forEach(entries -> {
- String id = entries.getFirstValue(uniqueField).toString();
- ids.add(id);
- });
- if (ids.size() > 0) {
+ results.forEach(
+ entries -> {
+ String id = entries.getFirstValue(uniqueField).toString();
+ ids.add(id);
+ });
+ if (!ids.isEmpty()) {
client.deleteById(collection, ids);
}
}
boolean isLeader(SolrQueryRequest req, String id, String route,
SolrInputDocument doc) {
- CloudDescriptor cloudDesc =
- req.getCore().getCoreDescriptor().getCloudDescriptor();
+ CloudDescriptor cloudDesc =
req.getCore().getCoreDescriptor().getCloudDescriptor();
String collection = cloudDesc.getCollectionName();
ClusterState clusterState =
req.getCore().getCoreContainer().getZkController().getClusterState();
DocCollection coll = clusterState.getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(id, doc, route,
req.getParams(), coll);
if (slice == null) {
- // No slice found. Most strict routers will have already thrown an
exception, so a null return is
+ // No slice found. Most strict routers will have already thrown an
exception, so a null
+ // return is
Review Comment:
Make prettier
##########
solr/modules/cross-dc/src/test-files/configs/cloud-minimal.zip:
##########
Review Comment:
Do we actually need this?
##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java:
##########
@@ -39,8 +39,9 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String DEFAULT_MAX_REQUEST_SIZE = "5242880";
public static final String DEFAULT_ENABLE_DATA_COMPRESSION = "none";
private static final String DEFAULT_INDEX_UNMIRRORABLE_DOCS = "false";
- public static final String DEFAULT_SLOW_SEND_THRESHOLD= "1000";
- public static final String DEFAULT_NUM_RETRIES = null; // by default, we
control retries with DELIVERY_TIMEOUT_MS_DOC
+ public static final String DEFAULT_SLOW_SEND_THRESHOLD = "1000";
+ public static final String DEFAULT_NUM_RETRIES =
+ null; // by default, we control retries with DELIVERY_TIMEOUT_MS_DOC
Review Comment:
Lets put the comment on a separate line
##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java:
##########
@@ -29,345 +33,369 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.CrossDcConstants;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.SolrExceptionUtil;
-import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
/**
- * Message processor implements all the logic to process a MirroredSolrRequest.
- * It handles:
- * 1. Sending the update request to Solr
- * 2. Discarding or retrying failed requests
- * 3. Flagging requests for resubmission by the underlying consumer
implementation.
+ * Message processor implements all the logic to process a
MirroredSolrRequest. It handles: 1.
+ * Sending the update request to Solr 2. Discarding or retrying failed
requests 3. Flagging requests
+ * for resubmission by the underlying consumer implementation.
*/
-public class SolrMessageProcessor extends MessageProcessor implements
IQueueHandler<MirroredSolrRequest> {
- private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final MetricRegistry metrics =
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
+public class SolrMessageProcessor extends MessageProcessor
+ implements IQueueHandler<MirroredSolrRequest<?>> {
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- final CloudSolrClient client;
+ private final MetricRegistry metrics =
+ SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
- private static final String VERSION_FIELD = "_version_";
+ final CloudSolrClient client;
- public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy
resubmitBackoffPolicy) {
- super(resubmitBackoffPolicy);
- this.client = client;
- }
+ private static final String VERSION_FIELD = "_version_";
- @Override
- public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest
mirroredSolrRequest) {
- try (final MDC.MDCCloseable mdc = MDC.putCloseable("collection",
getCollectionFromRequest(mirroredSolrRequest))) {
- connectToSolrIfNeeded();
+ public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy
resubmitBackoffPolicy) {
+ super(resubmitBackoffPolicy);
+ this.client = client;
+ }
- // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this
handled by the mirroring handler?
+ @Override
+ @SuppressWarnings("try")
+ public Result<MirroredSolrRequest<?>> handleItem(MirroredSolrRequest<?>
mirroredSolrRequest) {
+ try (final MDC.MDCCloseable ignored =
+ MDC.putCloseable("collection",
getCollectionFromRequest(mirroredSolrRequest))) {
+ connectToSolrIfNeeded();
- return processMirroredRequest(mirroredSolrRequest);
- }
- }
+ // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this
handled by the mirroring
+ // handler?
- private Result<MirroredSolrRequest>
processMirroredRequest(MirroredSolrRequest request) {
- final Result<MirroredSolrRequest> result = handleSolrRequest(request);
- // Back-off before returning
- backoffIfNeeded(result, request.getType());
- return result;
+ return processMirroredRequest(mirroredSolrRequest);
}
+ }
- private Result<MirroredSolrRequest> handleSolrRequest(MirroredSolrRequest
mirroredSolrRequest) {
-
- SolrRequest request = mirroredSolrRequest.getSolrRequest();
- final SolrParams requestParams = request.getParams();
+ private Result<MirroredSolrRequest<?>>
processMirroredRequest(MirroredSolrRequest<?> request) {
+ final Result<MirroredSolrRequest<?>> result = handleSolrRequest(request);
+ // Back-off before returning
+ backoffIfNeeded(result, request.getType());
+ return result;
+ }
- if (log.isDebugEnabled()) {
- log.debug("handleSolrRequest start params={}", requestParams);
- }
+ private Result<MirroredSolrRequest<?>> handleSolrRequest(
+ MirroredSolrRequest<?> mirroredSolrRequest) {
- // TODO: isn't this handled by the mirroring handler?
-// final String shouldMirror = requestParams.get("shouldMirror");
-//
-// if ("false".equalsIgnoreCase(shouldMirror)) {
-// log.warn("Skipping mirrored request because shouldMirror is set
to false. request={}", requestParams);
Review Comment:
Should we just get rid of this commented block?
##########
solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaCrossDcConf.java:
##########
@@ -114,12 +115,18 @@ public class KafkaCrossDcConf extends CrossDcConf {
public static final String FETCH_MAX_BYTES = "solr.crossdc.fetchMaxBytes";
- // The maximum delay between invocations of poll() when using consumer group
management. This places
- // an upper bound on the amount of time that the consumer can be idle before
fetching more records.
- // If poll() is not called before expiration of this timeout, then the
consumer is considered failed
- // and the group will rebalance in order to reassign the partitions to
another member. For consumers
- // using a non-null <code>group.instance.id</code> which reach this timeout,
partitions will not be
- // immediately reassigned. Instead, the consumer will stop sending
heartbeats and partitions will be
+ // The maximum delay between invocations of poll() when using consumer group
management. This
+ // places
+ // an upper bound on the amount of time that the consumer can be idle before
fetching more
+ // records.
+ // If poll() is not called before expiration of this timeout, then the
consumer is considered
+ // failed
+ // and the group will rebalance in order to reassign the partitions to
another member. For
+ // consumers
+ // using a non-null <code>group.instance.id</code> which reach this timeout,
partitions will not
+ // be
+ // immediately reassigned. Instead, the consumer will stop sending
heartbeats and partitions will
+ // be
Review Comment:
Annoying, but should be easy to fix.
##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java:
##########
@@ -29,345 +33,369 @@
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
-import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.CrossDcConstants;
import org.apache.solr.crossdc.common.IQueueHandler;
import org.apache.solr.crossdc.common.MirroredSolrRequest;
+import org.apache.solr.crossdc.common.ResubmitBackoffPolicy;
import org.apache.solr.crossdc.common.SolrExceptionUtil;
-import org.apache.solr.crossdc.consumer.Consumer;
+import org.apache.solr.crossdc.manager.consumer.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
-import java.lang.invoke.MethodHandles;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
/**
- * Message processor implements all the logic to process a MirroredSolrRequest.
- * It handles:
- * 1. Sending the update request to Solr
- * 2. Discarding or retrying failed requests
- * 3. Flagging requests for resubmission by the underlying consumer
implementation.
+ * Message processor implements all the logic to process a
MirroredSolrRequest. It handles: 1.
+ * Sending the update request to Solr 2. Discarding or retrying failed
requests 3. Flagging requests
+ * for resubmission by the underlying consumer implementation.
*/
-public class SolrMessageProcessor extends MessageProcessor implements
IQueueHandler<MirroredSolrRequest> {
- private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- private final MetricRegistry metrics =
SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
+public class SolrMessageProcessor extends MessageProcessor
+ implements IQueueHandler<MirroredSolrRequest<?>> {
+ private static final Logger log =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- final CloudSolrClient client;
+ private final MetricRegistry metrics =
+ SharedMetricRegistries.getOrCreate(Consumer.METRICS_REGISTRY);
- private static final String VERSION_FIELD = "_version_";
+ final CloudSolrClient client;
- public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy
resubmitBackoffPolicy) {
- super(resubmitBackoffPolicy);
- this.client = client;
- }
+ private static final String VERSION_FIELD = "_version_";
- @Override
- public Result<MirroredSolrRequest> handleItem(MirroredSolrRequest
mirroredSolrRequest) {
- try (final MDC.MDCCloseable mdc = MDC.putCloseable("collection",
getCollectionFromRequest(mirroredSolrRequest))) {
- connectToSolrIfNeeded();
+ public SolrMessageProcessor(CloudSolrClient client, ResubmitBackoffPolicy
resubmitBackoffPolicy) {
+ super(resubmitBackoffPolicy);
+ this.client = client;
+ }
- // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this
handled by the mirroring handler?
+ @Override
+ @SuppressWarnings("try")
+ public Result<MirroredSolrRequest<?>> handleItem(MirroredSolrRequest<?>
mirroredSolrRequest) {
+ try (final MDC.MDCCloseable ignored =
+ MDC.putCloseable("collection",
getCollectionFromRequest(mirroredSolrRequest))) {
+ connectToSolrIfNeeded();
- return processMirroredRequest(mirroredSolrRequest);
- }
- }
+ // preventCircularMirroring(mirroredSolrRequest); TODO: isn't this
handled by the mirroring
Review Comment:
Move TODO above
##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final ThreadPoolExecutor executor;
- private final ExecutorService offsetCheckExecutor =
Executors.newCachedThreadPool(r -> {
- Thread t = new Thread(r);
- t.setName("offset-check-thread");
- return t;
- });
- private PartitionManager partitionManager;
-
- private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
-
+ private final ExecutorService offsetCheckExecutor =
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ r -> {
Review Comment:
This can probably be made more succinct.
##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final ThreadPoolExecutor executor;
- private final ExecutorService offsetCheckExecutor =
Executors.newCachedThreadPool(r -> {
- Thread t = new Thread(r);
- t.setName("offset-check-thread");
- return t;
- });
- private PartitionManager partitionManager;
-
- private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
-
+ private final ExecutorService offsetCheckExecutor =
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ r -> {
+ Thread t = new Thread(r, "offset-check-thread");
+ return t;
+ });
+ private final PartitionManager partitionManager;
+ private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
/**
- * @param conf The Kafka consumer configuration
+ * @param conf The Kafka consumer configuration
* @param startLatch
*/
public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch
startLatch) {
this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
- this.collapseUpdates =
CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES),
CrossDcConf.CollapseUpdates.PARTIAL);
+ this.collapseUpdates =
+ CrossDcConf.CollapseUpdates.getOrDefault(
+ conf.get(CrossDcConf.COLLAPSE_UPDATES),
CrossDcConf.CollapseUpdates.PARTIAL);
this.maxCollapseRecords =
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
this.startLatch = startLatch;
final Properties kafkaConsumerProps = new Properties();
- kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
conf.get(KafkaCrossDcConf.GROUP_ID));
- kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
- kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
+ conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
- kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
- kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+ kafkaConsumerProps.put(
+ ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
- kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
- kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+ kafkaConsumerProps.put(
+ ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+ kafkaConsumerProps.put(
+ ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
+ conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
- kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
kafkaConsumerProps.putAll(conf.getAdditionalProperties());
int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
- executor = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("KafkaCrossDcConsumerWorker");
- return t;
- }
- });
+ executor =
+ new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ threads,
+ threads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ queue,
+ r -> {
Review Comment:
This too
##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -70,95 +89,107 @@ public class KafkaCrossDcConsumer extends
Consumer.CrossDcConsumer {
private final ThreadPoolExecutor executor;
- private final ExecutorService offsetCheckExecutor =
Executors.newCachedThreadPool(r -> {
- Thread t = new Thread(r);
- t.setName("offset-check-thread");
- return t;
- });
- private PartitionManager partitionManager;
-
- private BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
-
+ private final ExecutorService offsetCheckExecutor =
+ ExecutorUtil.newMDCAwareCachedThreadPool(
+ r -> {
+ Thread t = new Thread(r, "offset-check-thread");
+ return t;
+ });
+ private final PartitionManager partitionManager;
+ private final BlockingQueue<Runnable> queue = new BlockingQueue<>(10);
/**
- * @param conf The Kafka consumer configuration
+ * @param conf The Kafka consumer configuration
* @param startLatch
*/
public KafkaCrossDcConsumer(KafkaCrossDcConf conf, CountDownLatch
startLatch) {
this.topicNames = conf.get(KafkaCrossDcConf.TOPIC_NAME).split(",");
this.maxAttempts = conf.getInt(KafkaCrossDcConf.MAX_ATTEMPTS);
- this.collapseUpdates =
CrossDcConf.CollapseUpdates.getOrDefault(conf.get(CrossDcConf.COLLAPSE_UPDATES),
CrossDcConf.CollapseUpdates.PARTIAL);
+ this.collapseUpdates =
+ CrossDcConf.CollapseUpdates.getOrDefault(
+ conf.get(CrossDcConf.COLLAPSE_UPDATES),
CrossDcConf.CollapseUpdates.PARTIAL);
this.maxCollapseRecords =
conf.getInt(KafkaCrossDcConf.MAX_COLLAPSE_RECORDS);
this.startLatch = startLatch;
final Properties kafkaConsumerProps = new Properties();
- kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
conf.get(KafkaCrossDcConf.BOOTSTRAP_SERVERS));
kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,
conf.get(KafkaCrossDcConf.GROUP_ID));
- kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_POLL_RECORDS));
- kafkaConsumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
+ conf.get(KafkaCrossDcConf.MAX_POLL_INTERVAL_MS));
- kafkaConsumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
conf.get(KafkaCrossDcConf.SESSION_TIMEOUT_MS));
kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- kafkaConsumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
- kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MIN_BYTES));
+ kafkaConsumerProps.put(
+ ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_WAIT_MS));
- kafkaConsumerProps.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
- kafkaConsumerProps.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
+ kafkaConsumerProps.put(
+ ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
conf.getInt(KafkaCrossDcConf.FETCH_MAX_BYTES));
+ kafkaConsumerProps.put(
+ ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
+ conf.getInt(KafkaCrossDcConf.MAX_PARTITION_FETCH_BYTES));
- kafkaConsumerProps.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
+ kafkaConsumerProps.put(
+ ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
conf.getInt(KafkaCrossDcConf.REQUEST_TIMEOUT_MS));
KafkaCrossDcConf.addSecurityProps(conf, kafkaConsumerProps);
kafkaConsumerProps.putAll(conf.getAdditionalProperties());
int threads = conf.getInt(KafkaCrossDcConf.CONSUMER_PROCESSING_THREADS);
- executor = new ThreadPoolExecutor(threads, threads, 0L,
TimeUnit.MILLISECONDS, queue, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("KafkaCrossDcConsumerWorker");
- return t;
- }
- });
+ executor =
+ new ExecutorUtil.MDCAwareThreadPoolExecutor(
+ threads,
+ threads,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ queue,
+ r -> {
+ Thread t = new Thread(r, "KafkaCrossDcConsumerWorker");
+ return t;
+ });
executor.prestartAllCoreThreads();
solrClient = createSolrClient(conf);
messageProcessor = createSolrMessageProcessor();
-
-
log.info("Creating Kafka consumer with configuration {}",
kafkaConsumerProps);
kafkaConsumer = createKafkaConsumer(kafkaConsumerProps);
partitionManager = new PartitionManager(kafkaConsumer);
// Create producer for resubmitting failed requests
log.info("Creating Kafka resubmit producer");
this.kafkaMirroringSink = createKafkaMirroringSink(conf);
log.info("Created Kafka resubmit producer");
-
}
protected SolrMessageProcessor createSolrMessageProcessor() {
return new SolrMessageProcessor(solrClient, resubmitRequest -> 0L);
}
- public KafkaConsumer<String,MirroredSolrRequest>
createKafkaConsumer(Properties properties) {
- return new KafkaConsumer<>(properties, new StringDeserializer(), new
MirroredSolrRequestSerializer());
+ public KafkaConsumer<String, MirroredSolrRequest<?>>
createKafkaConsumer(Properties properties) {
+ return new KafkaConsumer<>(
+ properties, new StringDeserializer(), new
MirroredSolrRequestSerializer());
}
/**
- * This is where the magic happens.
- * 1. Polls and gets the packets from the queue
- * 2. Extract the MirroredSolrRequest objects
- * 3. Send the request to the MirroredSolrRequestHandler that has the
processing, retry, error handling logic.
+ * This is where the magic happens. 1. Polls and gets the packets from the
queue 2. Extract the
Review Comment:
Make prettier
##########
solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java:
##########
@@ -253,18 +290,20 @@ boolean pollAndProcessRequests() {
}
// it's an update but with different params
Review Comment:
Maybe make this less confusing? (in terms of spacing)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]