This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit 0c74109b522c3dd2219458dc664da802d24e5fd7 Author: Aaron Ai <[email protected]> AuthorDate: Tue Jul 12 21:11:48 2022 +0800 Java: refactor MessageMeter --- .../rocketmq/client/java/impl/ClientImpl.java | 8 +- .../java/impl/consumer/PushConsumerImpl.java | 2 +- .../rocketmq/client/java/metrics/MessageMeter.java | 215 +++++---------------- .../client/java/metrics/MessageMeterProvider.java | 182 +++++++++++++++++ .../rocketmq/client/java/metrics/Metric.java | 11 +- .../java/metrics/MetricMessageInterceptor.java | 51 +++-- 6 files changed, 272 insertions(+), 197 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index e47bdcd..0c801d5 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.hook.MessageHookPoints; import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus; import org.apache.rocketmq.client.java.hook.MessageInterceptor; import org.apache.rocketmq.client.java.message.MessageCommon; -import org.apache.rocketmq.client.java.metrics.MessageMeter; +import org.apache.rocketmq.client.java.metrics.MessageMeterProvider; import org.apache.rocketmq.client.java.metrics.Metric; import org.apache.rocketmq.client.java.misc.ExecutorServices; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; @@ -97,7 +97,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, // Thread-safe set. protected final Set<Endpoints> isolated; protected final ExecutorService clientCallbackExecutor; - protected final MessageMeter messageMeter; + protected final MessageMeterProvider messageMeterProvider; /** * Telemetry command executor, which is aims to execute commands from remote. */ @@ -147,7 +147,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, new LinkedBlockingQueue<>(), new ThreadFactoryImpl("ClientCallbackWorker")); - this.messageMeter = new MessageMeter(this); + this.messageMeterProvider = new MessageMeterProvider(this); this.telemetryCommandExecutor = new ThreadPoolExecutor( 1, @@ -317,7 +317,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, */ public final void onSettingsCommand(Endpoints endpoints, Settings settings) { final Metric metric = new Metric(settings.getMetric()); - messageMeter.refresh(metric); + messageMeterProvider.reset(metric); LOGGER.info("Receive settings from remote, endpoints={}", endpoints); this.getClientSettings().applySettingsCommand(settings); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java index 41648bd..9b34b56 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java @@ -153,7 +153,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach try { LOGGER.info("Begin to start the rocketmq push consumer, clientId={}", clientId); super.startUp(); - messageMeter.setMessageCacheObserver(this); + messageMeterProvider.setMessageCacheObserver(this); final ScheduledExecutorService scheduler = clientManager.getScheduler(); this.consumeService = createConsumeService(); this.consumeService.startAsync().awaitRunning(); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java index 2e7f287..a70f0d7 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeter.java @@ -17,211 +17,88 @@ package org.apache.rocketmq.client.java.metrics; -import io.grpc.ManagedChannel; -import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; -import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; -import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; -import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; -import io.opentelemetry.api.common.Attributes; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.base.MoreObjects; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.metrics.InstrumentSelector; -import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.View; -import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; -import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; -import java.net.InetSocketAddress; -import java.time.Duration; -import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import javax.net.ssl.SSLException; -import org.apache.rocketmq.client.apis.consumer.PushConsumer; -import org.apache.rocketmq.client.java.impl.ClientImpl; import org.apache.rocketmq.client.java.route.Endpoints; -import org.apache.rocketmq.client.java.rpc.AuthInterceptor; -import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MessageMeter { + static MessageMeter DISABLED = new MessageMeter(); private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeter.class); - private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3); - private static final Duration METRIC_READER_INTERVAL = Duration.ofMinutes(1); - private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message"; - - private final ClientImpl client; - - private volatile Meter meter; - private volatile Endpoints metricEndpoints; - private volatile SdkMeterProvider provider; - - private volatile MessageCacheObserver messageCacheObserver; - + private final boolean enabled; + private final Meter meter; + private final Endpoints endpoints; + private final SdkMeterProvider provider; private final ConcurrentMap<MetricName, DoubleHistogram> histogramMap; - public MessageMeter(ClientImpl client) { - this.client = client; + public MessageMeter(Meter meter, Endpoints endpoints, SdkMeterProvider provider) { + this.enabled = true; + this.meter = checkNotNull(meter, "meter should not be null"); + this.endpoints = checkNotNull(endpoints, "endpoints should not be null"); + this.provider = checkNotNull(provider, "provider should not be null"); this.histogramMap = new ConcurrentHashMap<>(); - this.client.registerMessageInterceptor(new MetricMessageInterceptor(this)); - this.messageCacheObserver = null; } - public void setMessageCacheObserver(MessageCacheObserver messageCacheObserver) { - this.messageCacheObserver = messageCacheObserver; + private MessageMeter() { + this.enabled = false; + this.meter = null; + this.endpoints = null; + this.provider = null; + this.histogramMap = new ConcurrentHashMap<>(); } - DoubleHistogram getHistogramByName(MetricName metricName) { - return histogramMap.computeIfAbsent(metricName, name -> meter.histogramBuilder(name.getName()).build()); + public boolean isEnabled() { + return enabled; } - public synchronized void refresh(Metric metric) { - final String clientId = client.getClientId(); - try { - if (!metric.isOn()) { - LOGGER.info("Skip metric refresh because metric is off, clientId={}", clientId); - shutdown(); - return; - } - final Optional<Endpoints> optionalEndpoints = metric.tryGetMetricEndpoints(); - if (!optionalEndpoints.isPresent()) { - LOGGER.error("[Bug] Metric switch is on but endpoints is not filled, clientId={}", - clientId); - return; - } - final Endpoints existedEndpoints = metricEndpoints; - final Endpoints newMetricEndpoints = optionalEndpoints.get(); - if (newMetricEndpoints.equals(metricEndpoints)) { - LOGGER.debug("Message metric exporter endpoints remains the same, clientId={}, endpoints={}", - clientId, newMetricEndpoints); - return; - } - this.reset(newMetricEndpoints); - LOGGER.info("Message meter endpoints is updated, clientId={}, {} => {}", clientId, existedEndpoints, - newMetricEndpoints); - } catch (Throwable t) { - LOGGER.error("Exception raised while refreshing message meter, clientId={}", clientId, t); - } + public Endpoints getEndpoints() { + return endpoints; + } + + Optional<DoubleHistogram> getHistogramByName(MetricName metricName) { + final DoubleHistogram histogram = histogramMap.computeIfAbsent(metricName, name -> enabled ? + meter.histogramBuilder(name.getName()).build() : null); + return null == histogram ? Optional.empty() : Optional.of(histogram); } - public synchronized void shutdown() { - if (null == provider) { + public void shutdown() { + if (!enabled) { return; } - final String clientId = client.getClientId(); - LOGGER.info("Begin to shutdown the message meter, clientId={}", clientId); final CountDownLatch latch = new CountDownLatch(1); provider.shutdown().whenComplete(latch::countDown); try { latch.await(); } catch (Throwable t) { - LOGGER.error("Exception raised while waiting for the shutdown of meter, clientId={}", clientId); + LOGGER.error("Failed to shutdown message meter, endpoints={}", endpoints, t); } - LOGGER.info("Shutdown the message meter, clientId={}", clientId); - // Clear endpoints. - metricEndpoints = null; - // Clear meter. - meter = null; - // Clear provider. - provider = null; - } - - public Meter getMeter() { - return meter; } - public ClientImpl getClient() { - return client; - } - - @SuppressWarnings("deprecation") - private void reset(Endpoints newMetricEndpoints) throws SSLException { - final String clientId = client.getClientId(); - final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE) - .build(); - final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(newMetricEndpoints.getGrpcTarget()) - .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId)); - final List<InetSocketAddress> socketAddresses = newMetricEndpoints.toSocketAddresses(); - if (null != socketAddresses) { - IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses); - channelBuilder.nameResolverFactory(metricResolverFactory); + public boolean satisfy(Metric metric) { + if (enabled && metric.isOn() && endpoints.equals(metric.getEndpoints())) { + return true; } - ManagedChannel channel = channelBuilder.build(); - - OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel) - .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT) - .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred()) - .build(); - - InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder() - .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build(); - final View sendSuccessCostTimeView = View.builder() - .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build(); - - InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder() - .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build(); - final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET) - .build(); - - InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder() - .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build(); - final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build(); - - InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder() - .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build(); - final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build(); - - PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter) - .setInterval(METRIC_READER_INTERVAL).build(); - - final SdkMeterProvider newProvider = SdkMeterProvider.builder().registerMetricReader(reader) - .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView) - .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView) - .registerView(awaitTimeInstrumentSelector, awaitTimeView) - .registerView(processTimeInstrumentSelector, processTimeView) - .build(); + return !enabled && !metric.isOn(); + } - final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(newProvider).build(); - meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME); - shutdown(); - // Force clean existed histogram. - histogramMap.clear(); - final ClientImpl client = this.getClient(); - if (!(client instanceof PushConsumer)) { - // No need for producer and simple consumer. - return; - } - final String consumerGroup = ((PushConsumer) client).getConsumerGroup(); - meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> { - final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount(); - for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) { - final String topic = entry.getKey(); - Attributes attributes = Attributes.builder() - .put(MetricLabels.TOPIC, topic) - .put(MetricLabels.CONSUMER_GROUP, consumerGroup) - .put(MetricLabels.CLIENT_ID, clientId).build(); - measurement.record(entry.getValue(), attributes); - } - }); - meter.gaugeBuilder(MetricName.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement -> { - final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes(); - for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) { - final String topic = entry.getKey(); - Attributes attributes = Attributes.builder() - .put(MetricLabels.TOPIC, topic) - .put(MetricLabels.CONSUMER_GROUP, consumerGroup) - .put(MetricLabels.CLIENT_ID, clientId).build(); - measurement.record(entry.getValue(), attributes); - } - }); - this.provider = newProvider; - this.metricEndpoints = newMetricEndpoints; + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("enabled", enabled) + .add("meter", meter) + .add("metricEndpoints", endpoints) + .add("provider", provider) + .add("histogramMap", histogramMap) + .toString(); } } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java new file mode 100644 index 0000000..6a1b548 --- /dev/null +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterProvider.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.java.metrics; + +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.InstrumentSelector; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.View; +import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.rocketmq.client.apis.consumer.PushConsumer; +import org.apache.rocketmq.client.java.impl.ClientImpl; +import org.apache.rocketmq.client.java.route.Endpoints; +import org.apache.rocketmq.client.java.rpc.AuthInterceptor; +import org.apache.rocketmq.client.java.rpc.IpNameResolverFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageMeterProvider { + private static final Logger LOGGER = LoggerFactory.getLogger(MessageMeterProvider.class); + + private static final Duration METRIC_EXPORTER_RPC_TIMEOUT = Duration.ofSeconds(3); + private static final Duration METRIC_READER_INTERVAL = Duration.ofSeconds(1); + private static final String METRIC_INSTRUMENTATION_NAME = "org.apache.rocketmq.message"; + + private final ClientImpl client; + + private volatile MessageMeter messageMeter; + + private volatile MessageCacheObserver messageCacheObserver; + + public MessageMeterProvider(ClientImpl client) { + this.client = client; + this.client.registerMessageInterceptor(new MetricMessageInterceptor(this)); + this.messageMeter = MessageMeter.DISABLED; + this.messageCacheObserver = null; + } + + public void setMessageCacheObserver(MessageCacheObserver messageCacheObserver) { + this.messageCacheObserver = messageCacheObserver; + } + + Optional<DoubleHistogram> getHistogramByName(MetricName metricName) { + return messageMeter.getHistogramByName(metricName); + } + + public synchronized void reset(Metric metric) { + final String clientId = client.getClientId(); + try { + if (messageMeter.satisfy(metric)) { + LOGGER.debug("Metric settings is satisfied by the current message meter, clientId={}", clientId); + return; + } + if (!metric.isOn()) { + LOGGER.debug("Metric is off, clientId={}", clientId); + messageMeter.shutdown(); + messageMeter = MessageMeter.DISABLED; + return; + } + final Endpoints endpoints = metric.getEndpoints(); + final SslContext sslContext = GrpcSslContexts.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE) + .build(); + final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(endpoints.getGrpcTarget()) + .sslContext(sslContext).intercept(new AuthInterceptor(client.getClientConfiguration(), clientId)); + final List<InetSocketAddress> socketAddresses = endpoints.toSocketAddresses(); + if (null != socketAddresses) { + IpNameResolverFactory metricResolverFactory = new IpNameResolverFactory(socketAddresses); + channelBuilder.nameResolverFactory(metricResolverFactory); + } + ManagedChannel channel = channelBuilder.build(); + OtlpGrpcMetricExporter exporter = OtlpGrpcMetricExporter.builder().setChannel(channel) + .setTimeout(METRIC_EXPORTER_RPC_TIMEOUT) + .setAggregationTemporalitySelector(AggregationTemporalitySelector.deltaPreferred()) + .build(); + + InstrumentSelector sendSuccessCostTimeInstrumentSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build(); + final View sendSuccessCostTimeView = View.builder() + .setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build(); + + InstrumentSelector deliveryLatencyInstrumentSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build(); + final View deliveryLatencyView = View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET) + .build(); + + InstrumentSelector awaitTimeInstrumentSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build(); + final View awaitTimeView = View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build(); + + InstrumentSelector processTimeInstrumentSelector = InstrumentSelector.builder() + .setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build(); + final View processTimeView = View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build(); + + PeriodicMetricReader reader = PeriodicMetricReader.builder(exporter) + .setInterval(METRIC_READER_INTERVAL).build(); + + final SdkMeterProvider provider = SdkMeterProvider.builder().registerMetricReader(reader) + .registerView(sendSuccessCostTimeInstrumentSelector, sendSuccessCostTimeView) + .registerView(deliveryLatencyInstrumentSelector, deliveryLatencyView) + .registerView(awaitTimeInstrumentSelector, awaitTimeView) + .registerView(processTimeInstrumentSelector, processTimeView) + .build(); + + final OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(provider).build(); + Meter meter = openTelemetry.getMeter(METRIC_INSTRUMENTATION_NAME); + + // Reset message meter. + MessageMeter existedMessageMeter = messageMeter; + messageMeter = new MessageMeter(meter, endpoints, provider); + existedMessageMeter.shutdown(); + LOGGER.info("Metrics is on, endpoints={}, clientId={}", endpoints, clientId); + + if (!(client instanceof PushConsumer)) { + // No need for producer and simple consumer. + return; + } + final String consumerGroup = ((PushConsumer) client).getConsumerGroup(); + meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement -> { + final Map<String, Long> cachedMessageCountMap = messageCacheObserver.getCachedMessageCount(); + for (Map.Entry<String, Long> entry : cachedMessageCountMap.entrySet()) { + final String topic = entry.getKey(); + Attributes attributes = Attributes.builder() + .put(MetricLabels.TOPIC, topic) + .put(MetricLabels.CONSUMER_GROUP, consumerGroup) + .put(MetricLabels.CLIENT_ID, clientId).build(); + measurement.record(entry.getValue(), attributes); + } + }); + meter.gaugeBuilder(MetricName.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement -> { + final Map<String, Long> cachedMessageBytesMap = messageCacheObserver.getCachedMessageBytes(); + for (Map.Entry<String, Long> entry : cachedMessageBytesMap.entrySet()) { + final String topic = entry.getKey(); + Attributes attributes = Attributes.builder() + .put(MetricLabels.TOPIC, topic) + .put(MetricLabels.CONSUMER_GROUP, consumerGroup) + .put(MetricLabels.CLIENT_ID, clientId).build(); + measurement.record(entry.getValue(), attributes); + } + }); + } catch (Throwable t) { + LOGGER.error("Exception raised when resetting message meter, clientId={}", clientId, t); + } + } + + public boolean isEnabled() { + return messageMeter.isEnabled(); + } + + public ClientImpl getClient() { + return client; + } +} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java index 7099a04..0cb7224 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/Metric.java @@ -17,20 +17,19 @@ package org.apache.rocketmq.client.java.metrics; -import java.util.Optional; import org.apache.rocketmq.client.java.route.Endpoints; public class Metric { - private final Endpoints metricEndpoints; + private final Endpoints endpoints; private final boolean on; public Metric(apache.rocketmq.v2.Metric metric) { - this.metricEndpoints = metric.hasEndpoints() ? new Endpoints(metric.getEndpoints()) : null; - this.on = metric.getOn(); + this.endpoints = metric.hasEndpoints() ? new Endpoints(metric.getEndpoints()) : null; + this.on = metric.getOn() && metric.hasEndpoints(); } - public Optional<Endpoints> tryGetMetricEndpoints() { - return null == metricEndpoints ? Optional.empty() : Optional.of(metricEndpoints); + public Endpoints getEndpoints() { + return endpoints; } public boolean isOn() { diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java index 2de9c53..a798455 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricMessageInterceptor.java @@ -21,7 +21,6 @@ import com.google.protobuf.Timestamp; import com.google.protobuf.util.Timestamps; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.DoubleHistogram; -import io.opentelemetry.api.metrics.Meter; import java.time.Duration; import java.util.List; import java.util.Optional; @@ -38,20 +37,25 @@ import org.slf4j.LoggerFactory; public class MetricMessageInterceptor implements MessageInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(MetricMessageInterceptor.class); - private final MessageMeter messageMeter; + private final MessageMeterProvider messageMeterProvider; - public MetricMessageInterceptor(MessageMeter messageMeter) { - this.messageMeter = messageMeter; + public MetricMessageInterceptor(MessageMeterProvider messageMeterProvider) { + this.messageMeterProvider = messageMeterProvider; } private void doAfterSendMessage(List<MessageCommon> messageCommons, Duration duration, MessageHookPointsStatus status) { - final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME); + final Optional<DoubleHistogram> optionalHistogram = + messageMeterProvider.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME); + if (!optionalHistogram.isPresent()) { + return; + } + final DoubleHistogram histogram = optionalHistogram.get(); for (MessageCommon messageCommon : messageCommons) { InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS : InvocationStatus.FAILURE; Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) - .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()) + .put(MetricLabels.CLIENT_ID, messageMeterProvider.getClient().getClientId()) .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build(); histogram.record(duration.toMillis(), attributes); } @@ -61,7 +65,7 @@ public class MetricMessageInterceptor implements MessageInterceptor { if (messageCommons.isEmpty()) { return; } - final ClientImpl client = messageMeter.getClient(); + final ClientImpl client = messageMeterProvider.getClient(); String consumerGroup = null; if (client instanceof PushConsumer) { consumerGroup = ((PushConsumer) client).getConsumerGroup(); @@ -80,7 +84,12 @@ public class MetricMessageInterceptor implements MessageInterceptor { } final Timestamp deliveryTimestampFromRemote = optionalDeliveryTimestampFromRemote.get(); final long latency = System.currentTimeMillis() - Timestamps.toMillis(deliveryTimestampFromRemote); - final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.DELIVERY_LATENCY); + final Optional<DoubleHistogram> optionalHistogram = + messageMeterProvider.getHistogramByName(MetricName.DELIVERY_LATENCY); + if (!optionalHistogram.isPresent()) { + return; + } + final DoubleHistogram histogram = optionalHistogram.get(); final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) .put(MetricLabels.CONSUMER_GROUP, consumerGroup) .put(MetricLabels.CLIENT_ID, client.getClientId()).build(); @@ -88,7 +97,7 @@ public class MetricMessageInterceptor implements MessageInterceptor { } private void doBeforeConsumeMessage(List<MessageCommon> messageCommons) { - final ClientImpl client = messageMeter.getClient(); + final ClientImpl client = messageMeterProvider.getClient(); String consumerGroup = null; if (client instanceof PushConsumer) { consumerGroup = ((PushConsumer) client).getConsumerGroup(); @@ -106,14 +115,18 @@ public class MetricMessageInterceptor implements MessageInterceptor { Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) .put(MetricLabels.CONSUMER_GROUP, consumerGroup) .put(MetricLabels.CLIENT_ID, client.getClientId()).build(); - final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.AWAIT_TIME); + final Optional<DoubleHistogram> optionalHistogram = + messageMeterProvider.getHistogramByName(MetricName.AWAIT_TIME); + if (!optionalHistogram.isPresent()) { + return; + } + final DoubleHistogram histogram = optionalHistogram.get(); histogram.record(durationAfterDecoding.toMillis(), attributes); } private void doAfterProcessMessage(List<MessageCommon> messageCommons, Duration duration, MessageHookPointsStatus status) { - final DoubleHistogram histogram = messageMeter.getHistogramByName(MetricName.PROCESS_TIME); - final ClientImpl client = messageMeter.getClient(); + final ClientImpl client = messageMeterProvider.getClient(); if (!(client instanceof PushConsumer)) { // Should never reach here. LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.getClientId()); @@ -125,17 +138,22 @@ public class MetricMessageInterceptor implements MessageInterceptor { InvocationStatus.FAILURE; Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic()) .put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup()) - .put(MetricLabels.CLIENT_ID, messageMeter.getClient().getClientId()) + .put(MetricLabels.CLIENT_ID, messageMeterProvider.getClient().getClientId()) .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()) .build(); + final Optional<DoubleHistogram> optionalHistogram = + messageMeterProvider.getHistogramByName(MetricName.PROCESS_TIME); + if (!optionalHistogram.isPresent()) { + return; + } + final DoubleHistogram histogram = optionalHistogram.get(); histogram.record(duration.toMillis(), attributes); } } @Override public void doBefore(MessageHookPoints messageHookPoints, List<MessageCommon> messageCommons) { - final Meter meter = messageMeter.getMeter(); - if (null == meter) { + if (!messageMeterProvider.isEnabled()) { return; } if (MessageHookPoints.CONSUME.equals(messageHookPoints)) { @@ -146,8 +164,7 @@ public class MetricMessageInterceptor implements MessageInterceptor { @Override public void doAfter(MessageHookPoints messageHookPoints, List<MessageCommon> messageCommons, Duration duration, MessageHookPointsStatus status) { - final Meter meter = messageMeter.getMeter(); - if (null == meter) { + if (!messageMeterProvider.isEnabled()) { return; } switch (messageHookPoints) {
