mlbiscoc commented on code in PR #4063: URL: https://github.com/apache/solr/pull/4063#discussion_r2729107142
########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.opentelemetry.OtlpExporterFactory; +import org.apache.solr.util.RTimer; + +public class OtelMetrics implements ConsumerMetrics { + + public static final String REGISTRY = "crossdc.consumer.registry"; + public static final String NAME_PREFIX = "crossdc_consumer_"; + public static final String ATTR_TYPE = "type"; + public static final String ATTR_SUBTYPE = "subtype"; + public static final String ATTR_RESULT = "result"; + + protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>(); + + protected SolrMetricManager metricManager; + + protected LongCounter input; + protected LongCounter collapsed; + protected LongCounter output; + protected LongHistogram outputBatchSizeHistogram; + protected LongHistogram outputTimeHistogram; + protected LongHistogram outputBackoffHistogram; + protected LongHistogram outputFirstAttemptHistogram; + + public OtelMetrics() { + register(REGISTRY); + } + + protected void register(String scope) { + this.metricManager = new SolrMetricManager(new OtlpExporterFactory().getExporter()); + SolrMetricsContext metricsContext = new SolrMetricsContext(metricManager, scope); + + input = + metricsContext.longCounter(NAME_PREFIX + "input_total", "Total number of input messages"); + + collapsed = + metricsContext.longCounter( + NAME_PREFIX + "collapsed_total", "Total number of collapsed messages"); + + output = + metricsContext.longCounter(NAME_PREFIX + "output_total", "Total number of output requests"); + + outputBatchSizeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_batch_size", "Histogram of output batch sizes"); + + outputBackoffHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_backoff_size", "Histogram of output backoff sleep times"); + + outputTimeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_time", + "Histogram of output request times", + OtelUnit.MILLISECONDS); + + outputFirstAttemptHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_first_attempt_time", + "Histogram of first attempt request times", + OtelUnit.NANOSECONDS); + } + + protected static final String KEY_SEPARATOR = "#"; + + protected Attributes attr(String key1, String value1) { + String key = key1 + KEY_SEPARATOR + value1; + return attributesCache.computeIfAbsent( + key, k -> Attributes.builder().put(key1, value1).build()); + } + + protected Attributes attr(String key1, String value1, String key2, String value2) { + String key = key1 + KEY_SEPARATOR + value1 + KEY_SEPARATOR + key2 + KEY_SEPARATOR + value2; + return attributesCache.computeIfAbsent( + key, k -> Attributes.builder().put(key1, value1).put(key2, value2).build()); + } + + public SolrMetricManager getMetricManager() { + return metricManager; + } + + @Override + public void incrementCollapsedCounter() { + collapsed.add(1L); + } + + @Override + public void incrementInputCounter(String type, String subType) { + incrementInputCounter(type, subType, 1); + } + + @Override + public void incrementInputCounter(String type, String subType, int delta) { + input.add(delta, attr("type", type, "subtype", subType)); + } + + @Override + public void incrementOutputCounter(String type, String result) { + incrementOutputCounter(type, result, 1); + } + + @Override + public void incrementOutputCounter(String type, String result, int delta) { + output.add(delta, attr("type", type, "result", result)); + } + + @Override + public void recordOutputBatchSize(MirroredSolrRequest.Type type, SolrRequest<?> solrRequest) { + if (type != MirroredSolrRequest.Type.UPDATE) { + outputBatchSizeHistogram.record( + 1, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, solrRequest.getPath())); + return; + } + UpdateRequest req = (UpdateRequest) solrRequest; + int addCount = req.getDocuments() == null ? 0 : req.getDocuments().size(); + int dbiCount = req.getDeleteById() == null ? 0 : req.getDeleteById().size(); + int dbqCount = req.getDeleteQuery() == null ? 0 : req.getDeleteQuery().size(); + if (addCount > 0) { + outputBatchSizeHistogram.record(addCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, "add")); + } + if (dbiCount > 0) { + outputBatchSizeHistogram.record( + dbiCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, "delete_by_id")); + } + if (dbqCount > 0) { + outputBatchSizeHistogram.record( + dbiCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, "delete_by_query")); Review Comment: This should be dbqCount. ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.opentelemetry.OtlpExporterFactory; +import org.apache.solr.util.RTimer; + +public class OtelMetrics implements ConsumerMetrics { + + public static final String REGISTRY = "crossdc.consumer.registry"; + public static final String NAME_PREFIX = "crossdc_consumer_"; + public static final String ATTR_TYPE = "type"; + public static final String ATTR_SUBTYPE = "subtype"; + public static final String ATTR_RESULT = "result"; + + protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>(); + + protected SolrMetricManager metricManager; + + protected LongCounter input; + protected LongCounter collapsed; + protected LongCounter output; + protected LongHistogram outputBatchSizeHistogram; + protected LongHistogram outputTimeHistogram; + protected LongHistogram outputBackoffHistogram; + protected LongHistogram outputFirstAttemptHistogram; + + public OtelMetrics() { + register(REGISTRY); + } + + protected void register(String scope) { + this.metricManager = new SolrMetricManager(new OtlpExporterFactory().getExporter()); + SolrMetricsContext metricsContext = new SolrMetricsContext(metricManager, scope); + + input = + metricsContext.longCounter(NAME_PREFIX + "input_total", "Total number of input messages"); + + collapsed = + metricsContext.longCounter( + NAME_PREFIX + "collapsed_total", "Total number of collapsed messages"); + + output = + metricsContext.longCounter(NAME_PREFIX + "output_total", "Total number of output requests"); + + outputBatchSizeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_batch_size", "Histogram of output batch sizes"); + + outputBackoffHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_backoff_size", "Histogram of output backoff sleep times"); + + outputTimeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_time", + "Histogram of output request times", + OtelUnit.MILLISECONDS); + + outputFirstAttemptHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_first_attempt_time", + "Histogram of first attempt request times", + OtelUnit.NANOSECONDS); + } + + protected static final String KEY_SEPARATOR = "#"; + + protected Attributes attr(String key1, String value1) { + String key = key1 + KEY_SEPARATOR + value1; + return attributesCache.computeIfAbsent( + key, k -> Attributes.builder().put(key1, value1).build()); + } + + protected Attributes attr(String key1, String value1, String key2, String value2) { + String key = key1 + KEY_SEPARATOR + value1 + KEY_SEPARATOR + key2 + KEY_SEPARATOR + value2; + return attributesCache.computeIfAbsent( + key, k -> Attributes.builder().put(key1, value1).put(key2, value2).build()); + } + + public SolrMetricManager getMetricManager() { + return metricManager; + } + + @Override + public void incrementCollapsedCounter() { + collapsed.add(1L); + } + + @Override + public void incrementInputCounter(String type, String subType) { + incrementInputCounter(type, subType, 1); + } + + @Override + public void incrementInputCounter(String type, String subType, int delta) { + input.add(delta, attr("type", type, "subtype", subType)); + } + + @Override + public void incrementOutputCounter(String type, String result) { + incrementOutputCounter(type, result, 1); + } + + @Override + public void incrementOutputCounter(String type, String result, int delta) { + output.add(delta, attr("type", type, "result", result)); + } + + @Override + public void recordOutputBatchSize(MirroredSolrRequest.Type type, SolrRequest<?> solrRequest) { + if (type != MirroredSolrRequest.Type.UPDATE) { + outputBatchSizeHistogram.record( + 1, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, solrRequest.getPath())); Review Comment: `solrRequest.getPath()` -> what is the cardinality of this? Probably not that concerning and I assume it is low but if there is many different paths this can get big. ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.opentelemetry.OtlpExporterFactory; +import org.apache.solr.util.RTimer; + +public class OtelMetrics implements ConsumerMetrics { + + public static final String REGISTRY = "crossdc.consumer.registry"; + public static final String NAME_PREFIX = "crossdc_consumer_"; + public static final String ATTR_TYPE = "type"; + public static final String ATTR_SUBTYPE = "subtype"; + public static final String ATTR_RESULT = "result"; + + protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>(); Review Comment: What is this for? ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java: ########## @@ -532,29 +533,29 @@ protected void processResult( "Sending message to dead letter queue because of max attempts limit with current value = {}", attempt); kafkaMirroringSink.submitToDlq(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc(); + metrics.incrementOutputCounter(type.name(), "failed-dlq"); } else { kafkaMirroringSink.submit(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc(); + metrics.incrementOutputCounter(type.name(), "failed-resubmit"); Review Comment: Also output looks to be more of a result rather than type. I think the name for this key should be `result` ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/Util.java: ########## @@ -34,41 +28,49 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.solr.common.params.SolrParams; import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer; +import org.apache.solr.handler.admin.MetricsHandler; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.response.PrometheusResponseWriter; +import org.apache.solr.response.SolrQueryResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Util { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - @SuppressWarnings("rawtypes") - public static void logMetrics(MetricRegistry metricRegistry) { - log.info("Metrics Registry:"); - for (Map.Entry<String, Gauge> entry : metricRegistry.getGauges().entrySet()) { - if (log.isInfoEnabled()) { - log.info("Gauge {}: {}", entry.getKey(), entry.getValue().getValue()); - } - } - for (Map.Entry<String, Counter> entry : metricRegistry.getCounters().entrySet()) { - if (log.isInfoEnabled()) { - log.info("Counter {}: {}", entry.getKey(), entry.getValue().getCount()); - } - } - for (Map.Entry<String, Histogram> entry : metricRegistry.getHistograms().entrySet()) { - if (log.isInfoEnabled()) { - log.info("Histogram {}: {}", entry.getKey(), entry.getValue().getSnapshot().toString()); - } - } - for (Map.Entry<String, Meter> entry : metricRegistry.getMeters().entrySet()) { - if (log.isInfoEnabled()) { - log.info("Meter {}: {}", entry.getKey(), entry.getValue().getCount()); - } - } - for (Map.Entry<String, Timer> entry : metricRegistry.getTimers().entrySet()) { - if (log.isInfoEnabled()) { - log.info("Timer {}: {}", entry.getKey(), entry.getValue().getSnapshot().toString()); - } + public static void logMetrics(SolrMetricManager metricManager) { + SolrQueryResponse rsp = new SolrQueryResponse(); + new MetricsHandler(metricManager) + .handleRequest(SolrParams.of(), (key, value) -> rsp.add(key, value)); + String output; + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + new PrometheusResponseWriter() + .write(baos, null, rsp, PrometheusResponseWriter.CONTENT_TYPE_PROMETHEUS); + output = baos.toString(); + } catch (Exception e) { + log.error("Error while writing final metrics", e); + output = rsp.toString(); } + log.info("#### Consumer Metrics: ####\n{}", output); + // List<MetricSnapshot> snapshotList = + // metricManager.getPrometheusMetricReaders().values().stream() + // .flatMap(r -> r.collect().stream()) + // .toList(); + // MetricSnapshots snapshots = MetricSnapshots.of(snapshotList.toArray(new + // MetricSnapshot[0])); + // String output; + // try { + // ByteArrayOutputStream baos = new ByteArrayOutputStream(); + // new PrometheusTextFormatWriter(false).write(baos, snapshots); + // output = baos.toString(); + // } catch (Exception e) { + // log.error("Error while writing final metrics", e); + // output = snapshots.toString(); + // } + // log.info("#### Consumer Metrics: ####\n{}", output); Review Comment: Delete this? ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.opentelemetry.OtlpExporterFactory; +import org.apache.solr.util.RTimer; + +public class OtelMetrics implements ConsumerMetrics { + + public static final String REGISTRY = "crossdc.consumer.registry"; + public static final String NAME_PREFIX = "crossdc_consumer_"; + public static final String ATTR_TYPE = "type"; + public static final String ATTR_SUBTYPE = "subtype"; + public static final String ATTR_RESULT = "result"; + + protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>(); + + protected SolrMetricManager metricManager; + + protected LongCounter input; + protected LongCounter collapsed; + protected LongCounter output; + protected LongHistogram outputBatchSizeHistogram; + protected LongHistogram outputTimeHistogram; + protected LongHistogram outputBackoffHistogram; + protected LongHistogram outputFirstAttemptHistogram; + + public OtelMetrics() { + register(REGISTRY); + } + + protected void register(String scope) { + this.metricManager = new SolrMetricManager(new OtlpExporterFactory().getExporter()); + SolrMetricsContext metricsContext = new SolrMetricsContext(metricManager, scope); + + input = + metricsContext.longCounter(NAME_PREFIX + "input_total", "Total number of input messages"); + + collapsed = + metricsContext.longCounter( + NAME_PREFIX + "collapsed_total", "Total number of collapsed messages"); + + output = + metricsContext.longCounter(NAME_PREFIX + "output_total", "Total number of output requests"); + + outputBatchSizeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_batch_size", "Histogram of output batch sizes"); + + outputBackoffHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_backoff_size", "Histogram of output backoff sleep times"); + + outputTimeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_time", + "Histogram of output request times", + OtelUnit.MILLISECONDS); + + outputFirstAttemptHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_first_attempt_time", + "Histogram of first attempt request times", + OtelUnit.NANOSECONDS); Review Comment: A nitpick but I think we should try to keep request times either one or the other as best as possible unless needed. Making dashboard or aggregations on different metrics of nanoseconds and milliseconds can get annoying. This is not a hard blocker though if you think we need nano and milli. ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.opentelemetry.OtlpExporterFactory; +import org.apache.solr.util.RTimer; + +public class OtelMetrics implements ConsumerMetrics { + + public static final String REGISTRY = "crossdc.consumer.registry"; + public static final String NAME_PREFIX = "crossdc_consumer_"; + public static final String ATTR_TYPE = "type"; + public static final String ATTR_SUBTYPE = "subtype"; + public static final String ATTR_RESULT = "result"; + + protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>(); + + protected SolrMetricManager metricManager; + + protected LongCounter input; + protected LongCounter collapsed; + protected LongCounter output; + protected LongHistogram outputBatchSizeHistogram; + protected LongHistogram outputTimeHistogram; + protected LongHistogram outputBackoffHistogram; + protected LongHistogram outputFirstAttemptHistogram; + + public OtelMetrics() { + register(REGISTRY); + } + + protected void register(String scope) { + this.metricManager = new SolrMetricManager(new OtlpExporterFactory().getExporter()); + SolrMetricsContext metricsContext = new SolrMetricsContext(metricManager, scope); + + input = + metricsContext.longCounter(NAME_PREFIX + "input_total", "Total number of input messages"); + + collapsed = + metricsContext.longCounter( + NAME_PREFIX + "collapsed_total", "Total number of collapsed messages"); + + output = + metricsContext.longCounter(NAME_PREFIX + "output_total", "Total number of output requests"); + + outputBatchSizeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_batch_size", "Histogram of output batch sizes"); + + outputBackoffHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_backoff_size", "Histogram of output backoff sleep times"); Review Comment: This is a time but you called it size. ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java: ########## @@ -532,29 +533,29 @@ protected void processResult( "Sending message to dead letter queue because of max attempts limit with current value = {}", attempt); kafkaMirroringSink.submitToDlq(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc(); + metrics.incrementOutputCounter(type.name(), "failed-dlq"); } else { kafkaMirroringSink.submit(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc(); + metrics.incrementOutputCounter(type.name(), "failed-resubmit"); Review Comment: Other thing I you could potentially do is change this metric. Instead have 2 success/failure. If success then `successfully_outputs` or some name like that then for failures or errors call is `failed_outputs` with a `type` label differentiating between the failures. `Failed submit` or `failed rety` etc. ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.opentelemetry.OtlpExporterFactory; +import org.apache.solr.util.RTimer; + +public class OtelMetrics implements ConsumerMetrics { + + public static final String REGISTRY = "crossdc.consumer.registry"; + public static final String NAME_PREFIX = "crossdc_consumer_"; + public static final String ATTR_TYPE = "type"; + public static final String ATTR_SUBTYPE = "subtype"; + public static final String ATTR_RESULT = "result"; + + protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>(); + + protected SolrMetricManager metricManager; + + protected LongCounter input; + protected LongCounter collapsed; + protected LongCounter output; + protected LongHistogram outputBatchSizeHistogram; + protected LongHistogram outputTimeHistogram; + protected LongHistogram outputBackoffHistogram; + protected LongHistogram outputFirstAttemptHistogram; + + public OtelMetrics() { + register(REGISTRY); + } + + protected void register(String scope) { + this.metricManager = new SolrMetricManager(new OtlpExporterFactory().getExporter()); + SolrMetricsContext metricsContext = new SolrMetricsContext(metricManager, scope); + + input = + metricsContext.longCounter(NAME_PREFIX + "input_total", "Total number of input messages"); Review Comment: I am not liking this metric very much. It is a bit confusing. For any non-update I see above you are just incrementing by 1 but at the same time this metric is incrementing by number of docs and/or deleted docs? The help message doesn't reflect that. I think it should be consistent otherwise the way these increment is confusing. Make 2 metrics names if one is number of requests and the other is the number of docs/deleted docs. Maybe call it `inputItems` and other `requests/messages` ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/OtelMetrics.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.crossdc.manager.consumer; + +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.crossdc.common.MirroredSolrRequest; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.opentelemetry.OtlpExporterFactory; +import org.apache.solr.util.RTimer; + +public class OtelMetrics implements ConsumerMetrics { + + public static final String REGISTRY = "crossdc.consumer.registry"; + public static final String NAME_PREFIX = "crossdc_consumer_"; + public static final String ATTR_TYPE = "type"; + public static final String ATTR_SUBTYPE = "subtype"; + public static final String ATTR_RESULT = "result"; + + protected final Map<String, Attributes> attributesCache = new ConcurrentHashMap<>(); + + protected SolrMetricManager metricManager; + + protected LongCounter input; + protected LongCounter collapsed; + protected LongCounter output; + protected LongHistogram outputBatchSizeHistogram; + protected LongHistogram outputTimeHistogram; + protected LongHistogram outputBackoffHistogram; + protected LongHistogram outputFirstAttemptHistogram; + + public OtelMetrics() { + register(REGISTRY); + } + + protected void register(String scope) { + this.metricManager = new SolrMetricManager(new OtlpExporterFactory().getExporter()); + SolrMetricsContext metricsContext = new SolrMetricsContext(metricManager, scope); + + input = + metricsContext.longCounter(NAME_PREFIX + "input_total", "Total number of input messages"); + + collapsed = + metricsContext.longCounter( + NAME_PREFIX + "collapsed_total", "Total number of collapsed messages"); + + output = + metricsContext.longCounter(NAME_PREFIX + "output_total", "Total number of output requests"); + + outputBatchSizeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_batch_size", "Histogram of output batch sizes"); + + outputBackoffHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_backoff_size", "Histogram of output backoff sleep times"); + + outputTimeHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_time", + "Histogram of output request times", + OtelUnit.MILLISECONDS); + + outputFirstAttemptHistogram = + metricsContext.longHistogram( + NAME_PREFIX + "output_first_attempt_time", + "Histogram of first attempt request times", + OtelUnit.NANOSECONDS); + } + + protected static final String KEY_SEPARATOR = "#"; + + protected Attributes attr(String key1, String value1) { + String key = key1 + KEY_SEPARATOR + value1; + return attributesCache.computeIfAbsent( + key, k -> Attributes.builder().put(key1, value1).build()); + } + + protected Attributes attr(String key1, String value1, String key2, String value2) { + String key = key1 + KEY_SEPARATOR + value1 + KEY_SEPARATOR + key2 + KEY_SEPARATOR + value2; + return attributesCache.computeIfAbsent( + key, k -> Attributes.builder().put(key1, value1).put(key2, value2).build()); + } + + public SolrMetricManager getMetricManager() { + return metricManager; + } + + @Override + public void incrementCollapsedCounter() { + collapsed.add(1L); + } + + @Override + public void incrementInputCounter(String type, String subType) { + incrementInputCounter(type, subType, 1); + } + + @Override + public void incrementInputCounter(String type, String subType, int delta) { + input.add(delta, attr("type", type, "subtype", subType)); + } + + @Override + public void incrementOutputCounter(String type, String result) { + incrementOutputCounter(type, result, 1); + } + + @Override + public void incrementOutputCounter(String type, String result, int delta) { + output.add(delta, attr("type", type, "result", result)); + } + + @Override + public void recordOutputBatchSize(MirroredSolrRequest.Type type, SolrRequest<?> solrRequest) { + if (type != MirroredSolrRequest.Type.UPDATE) { + outputBatchSizeHistogram.record( + 1, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, solrRequest.getPath())); + return; + } + UpdateRequest req = (UpdateRequest) solrRequest; + int addCount = req.getDocuments() == null ? 0 : req.getDocuments().size(); + int dbiCount = req.getDeleteById() == null ? 0 : req.getDeleteById().size(); + int dbqCount = req.getDeleteQuery() == null ? 0 : req.getDeleteQuery().size(); + if (addCount > 0) { + outputBatchSizeHistogram.record(addCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, "add")); + } + if (dbiCount > 0) { + outputBatchSizeHistogram.record( + dbiCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, "delete_by_id")); + } + if (dbqCount > 0) { + outputBatchSizeHistogram.record( + dbiCount, attr(ATTR_TYPE, type.name(), ATTR_SUBTYPE, "delete_by_query")); + } + } + + @Override + public void recordOutputBackoffTime(MirroredSolrRequest.Type type, long backoffTimeMs) { + outputBackoffHistogram.record(backoffTimeMs, attr(ATTR_TYPE, type.name())); + } + + @Override + public void recordOutputFirstAttemptTime(MirroredSolrRequest.Type type, long firstAttemptTimeNs) { + outputFirstAttemptHistogram.record(firstAttemptTimeNs, attr(ATTR_TYPE, type.name())); + } + + @Override + public ConsumerTimer startOutputTimeTimer(final String requestType) { + final RTimer timer = + new RTimer(TimeUnit.MILLISECONDS) { + @Override + public double stop() { + double elapsedTime = super.stop(); + outputTimeHistogram.record( + Double.valueOf(elapsedTime).longValue(), attr(ATTR_TYPE, requestType)); + return elapsedTime; + } + }; + return () -> timer.stop(); + } Review Comment: I had created these wrappers for doing this kind of thing. See `AttributedLongTimer`. Could you not use this? ########## solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java: ########## @@ -532,29 +533,29 @@ protected void processResult( "Sending message to dead letter queue because of max attempts limit with current value = {}", attempt); kafkaMirroringSink.submitToDlq(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc(); + metrics.incrementOutputCounter(type.name(), "failed-dlq"); } else { kafkaMirroringSink.submit(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc(); + metrics.incrementOutputCounter(type.name(), "failed-resubmit"); Review Comment: Do underscores instead of hyphens. I think that is the standard for OTEL and proemtheus ########## gradle/libs.versions.toml: ########## @@ -498,6 +498,8 @@ ow2-asm-commons = { module = "org.ow2.asm:asm-commons", version.ref = "ow2-asm" ow2-asm-tree = { module = "org.ow2.asm:asm-tree", version.ref = "ow2-asm" } # @keep transitive dependency for version alignment perfmark-api = { module = "io.perfmark:perfmark-api", version.ref = "perfmark" } +prometheus-metrics-core = { module = "io.prometheus:prometheus-metrics-core", version.ref = "prometheus-metrics" } Review Comment: Thank you for doing that! So much better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
