This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 4f8af39 Refactor client telemetry (#44)
4f8af39 is described below
commit 4f8af3976e34bbe5c21ded4621b2c1f62f62eeb6
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Jul 14 10:33:08 2022 +0800
Refactor client telemetry (#44)
* Refactor client telemetry
* Polish code
---
.../java/exception/InternalErrorException.java | 4 +
.../apache/rocketmq/client/java/impl/Client.java | 6 +-
.../rocketmq/client/java/impl/ClientImpl.java | 105 +++++++++++-----
.../rocketmq/client/java/impl/ClientManager.java | 10 ++
.../client/java/impl/ClientManagerImpl.java | 8 +-
.../client/java/impl/ClientManagerRegistry.java | 4 +-
...elemetrySession.java => ClientSessionImpl.java} | 138 +++++++++------------
.../java/impl/consumer/ProcessQueueImpl.java | 36 +++---
.../java/impl/producer/ClientSessionProcessor.java | 47 +++++++
.../rocketmq/client/java/metrics/ClientMeter.java | 8 +-
.../client/java/metrics/ClientMeterProvider.java | 31 ++---
.../metrics/{MetricName.java => GaugeEnum.java} | 31 +----
.../client/java/metrics/HistogramBuckets.java | 47 -------
.../{MetricName.java => HistogramEnum.java} | 45 ++++---
.../java/metrics/MessageMeterInterceptor.java | 22 ++--
.../java/impl/consumer/PushConsumerImplTest.java | 4 +-
.../java/impl/consumer/SimpleConsumerImplTest.java | 4 +-
.../java/impl/producer/ProducerImplTest.java | 12 +-
18 files changed, 293 insertions(+), 269 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index e1a44a4..f42d369 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -27,4 +27,8 @@ public class InternalErrorException extends ClientException {
public InternalErrorException(int responseCode, String message) {
super(responseCode, message);
}
+
+ public InternalErrorException(Throwable cause) {
+ super(cause);
+ }
}
\ No newline at end of file
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
index b106db3..b0ac5f6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
@@ -25,7 +25,7 @@ public interface Client {
*
* @return unique client identifier.
*/
- String getClientId();
+ String clientId();
/**
* Send heart beat to remote {@link Endpoints}.
@@ -33,9 +33,9 @@ public interface Client {
void doHeartbeat();
/**
- * Voluntary announce settings to remote.
+ * Sync settings to remote.
*/
- void telemeterSettings();
+ void syncSettings();
/**
* Do some stats for client.
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 3cf8a36..093c02d 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -42,11 +42,13 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
+import io.grpc.stub.StreamObserver;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -67,10 +69,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
import org.apache.rocketmq.client.java.metrics.Metric;
@@ -86,10 +90,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public abstract class ClientImpl extends AbstractIdleService implements
Client, MessageInterceptor {
+public abstract class ClientImpl extends AbstractIdleService implements
Client, ClientSessionProcessor,
+ MessageInterceptor {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClientImpl.class);
private static final Duration TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP =
Duration.ofSeconds(3);
+ private static final Duration TELEMETRY_TIMEOUT = Duration.ofDays(102 *
365);
+
protected volatile ClientManager clientManager;
protected final ClientConfiguration clientConfiguration;
protected final Endpoints accessEndpoints;
@@ -99,7 +106,7 @@ public abstract class ClientImpl extends AbstractIdleService
implements Client,
protected final ExecutorService clientCallbackExecutor;
protected final ClientMeterProvider clientMeterProvider;
/**
- * Telemetry command executor, which is aims to execute commands from
remote.
+ * Telemetry command executor, which aims to execute commands from the
remote.
*/
protected final ThreadPoolExecutor telemetryCommandExecutor;
protected final String clientId;
@@ -111,9 +118,9 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
private final Map<String /* topic */,
Set<SettableFuture<TopicRouteDataResult>>> inflightRouteFutureTable;
private final Lock inflightRouteFutureLock;
- @GuardedBy("telemetrySessionsLock")
- private final ConcurrentMap<Endpoints, TelemetrySession>
telemetrySessionTable;
- private final ReadWriteLock telemetrySessionsLock;
+ @GuardedBy("endpointsSessionsLock")
+ private final Map<Endpoints, ClientSessionImpl> endpointsSessionTable;
+ private final ReadWriteLock endpointsSessionsLock;
@GuardedBy("messageInterceptorsLock")
private final List<MessageInterceptor> messageInterceptors;
@@ -131,8 +138,8 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
this.inflightRouteFutureTable = new ConcurrentHashMap<>();
this.inflightRouteFutureLock = new ReentrantLock();
- this.telemetrySessionTable = new ConcurrentHashMap<>();
- this.telemetrySessionsLock = new ReentrantReadWriteLock();
+ this.endpointsSessionTable = new HashMap<>();
+ this.endpointsSessionsLock = new ReentrantReadWriteLock();
this.isolated = Collections.newSetFromMap(new ConcurrentHashMap<>());
@@ -274,13 +281,39 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
}
+ @Override
+ public TelemetryCommand getSettingsCommand() {
+ final Settings settings = this.getClientSettings().toProtobuf();
+ return TelemetryCommand.newBuilder().setSettings(settings).build();
+ }
+
+ @Override
+ public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
+ StreamObserver<TelemetryCommand> observer) throws ClientException {
+ try {
+ final Metadata metadata = this.sign();
+ return clientManager.telemetry(endpoints, metadata,
TELEMETRY_TIMEOUT, observer);
+ } catch (ClientException e) {
+ throw e;
+ } catch (Throwable t) {
+ throw new InternalErrorException(t);
+ }
+ }
+
+ @Override
+ public ListenableFuture<Void> register() {
+ return Futures.transformAsync(this.getClientSettings().arrivedFuture,
+ (clientSettings) -> Futures.immediateVoidFuture(),
clientCallbackExecutor);
+ }
+
/**
* This method is invoked while request of printing thread stack trace is
received from remote.
*
* @param endpoints remote endpoints.
* @param command request of printing thread stack trace from remote.
*/
- void onPrintThreadStackCommand(Endpoints endpoints,
PrintThreadStackTraceCommand command) {
+ @Override
+ public void onPrintThreadStackTraceCommand(Endpoints endpoints,
PrintThreadStackTraceCommand command) {
final String nonce = command.getNonce();
Runnable task = () -> {
try {
@@ -314,6 +347,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
* @param endpoints remote endpoints.
* @param settings settings received from remote.
*/
+ @Override
public final void onSettingsCommand(Endpoints endpoints, Settings
settings) {
final Metric metric = new Metric(settings.getMetric());
clientMeterProvider.reset(metric);
@@ -322,10 +356,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
/**
- * @see Client#telemeterSettings()
+ * @see Client#syncSettings()
*/
@Override
- public void telemeterSettings() {
+ public void syncSettings() {
final Settings settings = getClientSettings().toProtobuf();
final TelemetryCommand command =
TelemetryCommand.newBuilder().setSettings(settings).build();
final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
@@ -345,12 +379,12 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
* @param command command to telemeter.
*/
public void telemeter(Endpoints endpoints, TelemetryCommand command) {
- final ListenableFuture<TelemetrySession> future =
registerTelemetrySession(endpoints);
- Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
+ final ListenableFuture<ClientSessionImpl> future =
registerTelemetrySession(endpoints);
+ Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
@Override
- public void onSuccess(TelemetrySession session) {
+ public void onSuccess(ClientSessionImpl session) {
try {
- session.telemeter(command);
+ session.publish(command);
} catch (Throwable t) {
LOGGER.error("Failed to telemeter command, endpoints={},
command={}", endpoints, command);
}
@@ -364,43 +398,44 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
private void releaseTelemetrySessions() {
- telemetrySessionsLock.readLock().lock();
+ endpointsSessionsLock.readLock().lock();
try {
- telemetrySessionTable.values().forEach(TelemetrySession::release);
+ endpointsSessionTable.values().forEach(ClientSessionImpl::release);
} finally {
- telemetrySessionsLock.readLock().unlock();
+ endpointsSessionsLock.readLock().unlock();
}
}
/**
* Try to register telemetry session, return it directly if session is
existed already.
*/
- public ListenableFuture<TelemetrySession>
registerTelemetrySession(Endpoints endpoints) {
- final SettableFuture<TelemetrySession> future0 =
SettableFuture.create();
- telemetrySessionsLock.readLock().lock();
+ public ListenableFuture<ClientSessionImpl>
registerTelemetrySession(Endpoints endpoints) {
+ final SettableFuture<ClientSessionImpl> future0 =
SettableFuture.create();
+ endpointsSessionsLock.readLock().lock();
try {
- TelemetrySession telemetrySession =
telemetrySessionTable.get(endpoints);
+ ClientSessionImpl clientSessionImpl =
endpointsSessionTable.get(endpoints);
// Return is directly if session is existed already.
- if (null != telemetrySession) {
- future0.set(telemetrySession);
+ if (null != clientSessionImpl) {
+ future0.set(clientSessionImpl);
return future0;
}
} finally {
- telemetrySessionsLock.readLock().unlock();
+ endpointsSessionsLock.readLock().unlock();
}
// Future's exception has been logged during the registration.
- final ListenableFuture<TelemetrySession> future =
TelemetrySession.register(this, clientManager, endpoints);
+ final ListenableFuture<ClientSessionImpl> future = new
ClientSessionImpl(this, endpoints).register();
return Futures.transform(future, session -> {
- telemetrySessionsLock.writeLock().lock();
+ endpointsSessionsLock.writeLock().lock();
try {
- TelemetrySession existed =
telemetrySessionTable.get(endpoints);
+ ClientSessionImpl existed =
endpointsSessionTable.get(endpoints);
if (null != existed) {
+ session.release();
return existed;
}
- telemetrySessionTable.put(endpoints, session);
+ endpointsSessionTable.put(endpoints, session);
return session;
} finally {
- telemetrySessionsLock.writeLock().unlock();
+ endpointsSessionsLock.writeLock().unlock();
}
}, MoreExecutors.directExecutor());
}
@@ -412,7 +447,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
*/
public ListenableFuture<Void> onTopicRouteDataResultFetched(String topic,
TopicRouteDataResult topicRouteDataResult) {
- final ListenableFuture<List<TelemetrySession>> future =
+ final ListenableFuture<List<ClientSessionImpl>> future =
Futures.allAsList(topicRouteDataResult.getTopicRouteData()
.getMessageQueues().stream()
.map(mq -> mq.getBroker().getEndpoints())
@@ -420,9 +455,9 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
.stream().map(this::registerTelemetrySession)
.collect(Collectors.toList()));
SettableFuture<Void> future0 = SettableFuture.create();
- Futures.addCallback(future, new
FutureCallback<List<TelemetrySession>>() {
+ Futures.addCallback(future, new
FutureCallback<List<ClientSessionImpl>>() {
@Override
- public void onSuccess(List<TelemetrySession> sessions) {
+ public void onSuccess(List<ClientSessionImpl> sessions) {
LOGGER.info("Register session successfully, current route will
be cached, topic={}, "
+ "topicRouteDataResult={}", topic, topicRouteDataResult);
final TopicRouteDataResult old =
topicRouteResultCache.put(topic, topicRouteDataResult);
@@ -459,6 +494,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
* @param endpoints remote endpoints.
* @param command request of message consume verification from remote.
*/
+ @Override
public void onVerifyMessageCommand(Endpoints endpoints,
VerifyMessageCommand command) {
LOGGER.warn("Ignore verify message command from remote, which is not
expected, clientId={}, command={}",
clientId, command);
@@ -482,6 +518,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
* @param endpoints remote endpoints.
* @param command request of orphaned transaction recovery from remote.
*/
+ @Override
public void onRecoverOrphanedTransactionCommand(Endpoints endpoints,
RecoverOrphanedTransactionCommand command) {
LOGGER.warn("Ignore orphaned transaction recovery command from remote,
which is not expected, client id={}, "
+ "command={}", clientId, command);
@@ -532,10 +569,10 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
/**
- * @see Client#getClientId()
+ * @see Client#clientId()
*/
@Override
- public String getClientId() {
+ public String clientId() {
return clientId;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 9901ac9..b61e315 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -205,6 +205,16 @@ public interface ClientManager {
ListenableFuture<InvocationContext<NotifyClientTerminationResponse>>
notifyClientTermination(Endpoints endpoints,
Metadata metadata, NotifyClientTerminationRequest request, Duration
duration);
+ /**
+ * Establish telemetry session stream to server.
+ *
+ * @param endpoints request endpoints.
+ * @param metadata gRPC request header metadata.
+ * @param duration stream max duration.
+ * @param responseObserver response observer.
+ * @return request observer.
+ * @throws ClientException if failed to establish telemetry session stream.
+ */
StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata
metadata,
Duration duration, StreamObserver<TelemetryCommand> responseObserver)
throws ClientException;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 337edcc..b564634 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -131,12 +131,12 @@ public class ClientManagerImpl extends
AbstractIdleService implements ClientMana
@Override
public void registerClient(Client client) {
- clientTable.put(client.getClientId(), client);
+ clientTable.put(client.clientId(), client);
}
@Override
public void unregisterClient(Client client) {
- clientTable.remove(client.getClientId());
+ clientTable.remove(client.clientId());
}
@Override
@@ -189,9 +189,9 @@ public class ClientManagerImpl extends AbstractIdleService
implements ClientMana
private void syncSettings() {
clientTable.values().forEach(client -> {
try {
- client.telemeterSettings();
+ client.syncSettings();
} catch (Throwable t) {
- LOGGER.error("Failed to announce settings, clientId={}",
client.getClientId(), t);
+ LOGGER.error("Failed to announce settings, clientId={}",
client.clientId(), t);
}
});
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
index 42dc30d..80299da 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
@@ -57,7 +57,7 @@ public class ClientManagerRegistry {
clientManager.startAsync().awaitRunning();
singletonClientManager = clientManager;
}
- clientIds.add(client.getClientId());
+ clientIds.add(client.clientId());
singletonClientManager.registerClient(client);
return singletonClientManager;
} finally {
@@ -77,7 +77,7 @@ public class ClientManagerRegistry {
ClientManagerImpl clientManager = null;
clientIdsLock.lock();
try {
- clientIds.remove(client.getClientId());
+ clientIds.remove(client.clientId());
singletonClientManager.unregisterClient(client);
if (clientIds.isEmpty()) {
clientManager = singletonClientManager;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
similarity index 54%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 36d74a5..d6a19c2 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -26,14 +26,11 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
-import java.io.UnsupportedEncodingException;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.time.Duration;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,51 +39,39 @@ import org.slf4j.LoggerFactory;
* Telemetry session is constructed before first communication between client
and remote route endpoints.
*/
@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public class TelemetrySession implements StreamObserver<TelemetryCommand> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(TelemetrySession.class);
+public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClientSessionImpl.class);
- private final ClientImpl client;
- private final ClientManager clientManager;
+ private final ClientSessionProcessor processor;
private final Endpoints endpoints;
- private volatile StreamObserver<TelemetryCommand> requestObserver;
+ private final ReadWriteLock observerLock;
+ private StreamObserver<TelemetryCommand> requestObserver = null;
- private TelemetrySession(ClientImpl client, ClientManager clientManager,
Endpoints endpoints) {
- this.client = client;
- this.clientManager = clientManager;
+ protected ClientSessionImpl(ClientSessionProcessor processor, Endpoints
endpoints) {
+ this.processor = processor;
this.endpoints = endpoints;
+ this.observerLock = new ReentrantReadWriteLock();
}
- public static ListenableFuture<TelemetrySession> register(ClientImpl
client, ClientManager clientManager,
- Endpoints endpoints) {
- return new TelemetrySession(client, clientManager,
endpoints).register();
- }
-
- private ListenableFuture<TelemetrySession> register() {
- ListenableFuture<TelemetrySession> future;
+ protected ListenableFuture<ClientSessionImpl> register() {
+ ListenableFuture<ClientSessionImpl> future;
try {
- this.init();
- final ClientSettings clientSettings = client.getClientSettings();
- final Settings settings = clientSettings.toProtobuf();
- final TelemetryCommand settingsCommand =
TelemetryCommand.newBuilder().setSettings(settings).build();
- this.telemeter(settingsCommand);
- future = Futures.transform(clientSettings.getArrivedFuture(),
input -> this,
- MoreExecutors.directExecutor());
+ final TelemetryCommand command = processor.getSettingsCommand();
+ this.publish(command);
+ future = Futures.transform(processor.register(), input -> this,
MoreExecutors.directExecutor());
} catch (Throwable t) {
- SettableFuture<TelemetrySession> future0 = SettableFuture.create();
- future0.setException(t);
- future = future0;
+ future = Futures.immediateFailedFuture(t);
}
- Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
+ final String clientId = processor.clientId();
+ Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
@Override
- public void onSuccess(TelemetrySession session) {
- LOGGER.info("Register telemetry session successfully,
endpoints={}, clientId={}", endpoints,
- client.getClientId());
+ public void onSuccess(ClientSessionImpl session) {
+ LOGGER.info("Register client session successfully,
endpoints={}, clientId={}", endpoints, clientId);
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Failed to register telemetry session,
endpoints={}, clientId={}", endpoints,
- client.getClientId(), t);
+ LOGGER.error("Failed to register client session, endpoints={},
clientId={}", endpoints, clientId, t);
release();
}
}, MoreExecutors.directExecutor());
@@ -96,31 +81,19 @@ public class TelemetrySession implements
StreamObserver<TelemetryCommand> {
/**
* Release telemetry session.
*/
- public synchronized void release() {
+ public void release() {
+ this.observerLock.writeLock().lock();
try {
if (null != requestObserver) {
- requestObserver.onCompleted();
+ try {
+ requestObserver.onCompleted();
+ } catch (Throwable ignore) {
+ // Ignore exception on purpose.
+ }
+ requestObserver = null;
}
- } catch (Throwable ignore) {
- // Ignore exception on purpose.
- }
- }
-
- /**
- * Initialize telemetry session.
- */
- private synchronized void init() throws UnsupportedEncodingException,
NoSuchAlgorithmException,
- InvalidKeyException, ClientException {
- this.release();
- final Metadata metadata = client.sign();
- this.requestObserver = clientManager.telemetry(endpoints, metadata,
Duration.ofNanos(Long.MAX_VALUE), this);
- }
-
- private void reinit() {
- try {
- init();
- } catch (Throwable ignore) {
- // Ignore exception on purpose.
+ } finally {
+ this.observerLock.writeLock().unlock();
}
}
@@ -129,13 +102,24 @@ public class TelemetrySession implements
StreamObserver<TelemetryCommand> {
*
* @param command appointed command to telemeter
*/
- public void telemeter(TelemetryCommand command) {
+ public void publish(TelemetryCommand command) throws ClientException {
+ this.observerLock.readLock().lock();
try {
+ if (null != requestObserver) {
+ requestObserver.onNext(command);
+ return;
+ }
+ } finally {
+ this.observerLock.readLock().unlock();
+ }
+ this.observerLock.writeLock().lock();
+ try {
+ if (null == requestObserver) {
+ this.requestObserver = processor.telemetry(endpoints, this);
+ }
requestObserver.onNext(command);
- } catch (RuntimeException e) {
- // Cancel RPC.
- requestObserver.onError(e);
- throw e;
+ } finally {
+ this.observerLock.writeLock().unlock();
}
}
@@ -146,52 +130,52 @@ public class TelemetrySession implements
StreamObserver<TelemetryCommand> {
case SETTINGS: {
final Settings settings = command.getSettings();
LOGGER.info("Receive settings from remote, endpoints={},
clientId={}", endpoints,
- client.getClientId());
- client.onSettingsCommand(endpoints, settings);
+ processor.clientId());
+ processor.onSettingsCommand(endpoints, settings);
break;
}
case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
final RecoverOrphanedTransactionCommand
recoverOrphanedTransactionCommand =
command.getRecoverOrphanedTransactionCommand();
LOGGER.info("Receive orphaned transaction recovery command
from remote, endpoints={}, "
- + "clientId={}", endpoints, client.getClientId());
- client.onRecoverOrphanedTransactionCommand(endpoints,
recoverOrphanedTransactionCommand);
+ + "clientId={}", endpoints, processor.clientId());
+ processor.onRecoverOrphanedTransactionCommand(endpoints,
recoverOrphanedTransactionCommand);
break;
}
case VERIFY_MESSAGE_COMMAND: {
final VerifyMessageCommand verifyMessageCommand =
command.getVerifyMessageCommand();
LOGGER.info("Receive message verification command from
remote, endpoints={}, clientId={}",
- endpoints, client.getClientId());
- client.onVerifyMessageCommand(endpoints,
verifyMessageCommand);
+ endpoints, processor.clientId());
+ processor.onVerifyMessageCommand(endpoints,
verifyMessageCommand);
break;
}
case PRINT_THREAD_STACK_TRACE_COMMAND: {
final PrintThreadStackTraceCommand
printThreadStackTraceCommand =
command.getPrintThreadStackTraceCommand();
LOGGER.info("Receive thread stack print command from
remote, endpoints={}, clientId={}",
- endpoints, client.getClientId());
- client.onPrintThreadStackCommand(endpoints,
printThreadStackTraceCommand);
+ endpoints, processor.clientId());
+ processor.onPrintThreadStackTraceCommand(endpoints,
printThreadStackTraceCommand);
break;
}
default:
LOGGER.warn("Receive unrecognized command from remote,
endpoints={}, command={}, clientId={}",
- endpoints, command, client.getClientId());
+ endpoints, command, processor.clientId());
}
} catch (Throwable t) {
LOGGER.error("[Bug] unexpected exception raised while receiving
command from remote, command={}, "
- + "clientId={}", command, client.getClientId(), t);
+ + "clientId={}", command, processor.clientId(), t);
}
}
@Override
public void onError(Throwable throwable) {
LOGGER.error("Exception raised from stream response observer,
clientId={}, endpoints={}",
- client.getClientId(), endpoints, throwable);
- reinit();
+ processor.clientId(), endpoints, throwable);
+ this.release();
}
@Override
public void onCompleted() {
- reinit();
+ this.release();
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index c914dce..bdc6a6d 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -128,7 +128,7 @@ class ProcessQueueImpl implements ProcessQueue {
return false;
}
LOGGER.warn("Process queue is idle, idleDuration={},
maxIdleDuration={}, mq={}, clientId={}", idleDuration,
- maxIdleDuration, mq, consumer.getClientId());
+ maxIdleDuration, mq, consumer.clientId());
return true;
}
@@ -157,12 +157,12 @@ class ProcessQueueImpl implements ProcessQueue {
final MessageId messageId = messageView.getMessageId();
if (consumer.getPushConsumerSettings().isFifo()) {
LOGGER.error("Message is corrupted, forward it to dead
letter queue in fifo mode, mq={}, " +
- "messageId={}, clientId={}", mq, messageId,
consumer.getClientId());
+ "messageId={}, clientId={}", mq, messageId,
consumer.clientId());
forwardToDeadLetterQueue(messageView);
return;
}
LOGGER.error("Message is corrupted, nack it in standard mode,
mq={}, messageId={}, clientId={}", mq,
- messageId, consumer.getClientId());
+ messageId, consumer.clientId());
nackMessage(messageView);
});
}
@@ -194,7 +194,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
// Should never reach here.
LOGGER.error("[Bug] Failed to schedule receive message request,
mq={}, clientId={}", mq,
- consumer.getClientId(), t);
+ consumer.clientId(), t);
receiveMessageLater();
}
}
@@ -202,12 +202,12 @@ class ProcessQueueImpl implements ProcessQueue {
public void receiveMessage() {
if (dropped) {
LOGGER.info("Process queue has been dropped, no longer receive
message, mq={}, clientId={}", mq,
- consumer.getClientId());
+ consumer.clientId());
return;
}
if (this.isCacheFull()) {
LOGGER.warn("Process queue cache is full, would receive message
later, mq={}, clientId={}", mq,
- consumer.getClientId());
+ consumer.clientId());
receiveMessageLater();
return;
}
@@ -217,7 +217,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void receiveMessageImmediately() {
if (!consumer.isRunning()) {
LOGGER.info("Stop to receive message because consumer is not
running, mq={}, clientId={}", mq,
- consumer.getClientId());
+ consumer.clientId());
return;
}
try {
@@ -251,7 +251,7 @@ class ProcessQueueImpl implements ProcessQueue {
// Should never reach here.
LOGGER.error("[Bug] Exception raised while handling
receive result, would receive later," +
" mq={}, endpoints={}, clientId={}",
- mq, endpoints, consumer.getClientId(), t);
+ mq, endpoints, consumer.clientId(), t);
receiveMessageLater();
}
}
@@ -264,14 +264,14 @@ class ProcessQueueImpl implements ProcessQueue {
MessageHookPointsStatus.ERROR);
LOGGER.error("Exception raised while message reception,
would receive later, mq={}, endpoints={}," +
- " clientId={}", mq, endpoints, consumer.getClientId(),
t);
+ " clientId={}", mq, endpoints, consumer.clientId(), t);
receiveMessageLater();
}
}, MoreExecutors.directExecutor());
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
LOGGER.error("Exception raised while message reception, would
receive later, mq={}, clientId={}", mq,
- consumer.getClientId(), t);
+ consumer.clientId(), t);
receiveMessageLater();
}
}
@@ -282,7 +282,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
LOGGER.warn("Process queue total cached messages quantity exceeds
the threshold, threshold={}, actual={}," +
" mq={}, clientId={}",
- cacheMessageCountThresholdPerQueue, actualMessagesQuantity,
mq, consumer.getClientId());
+ cacheMessageCountThresholdPerQueue, actualMessagesQuantity,
mq, consumer.clientId());
return true;
}
final int cacheMessageBytesThresholdPerQueue =
consumer.cacheMessageBytesThresholdPerQueue();
@@ -290,7 +290,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
LOGGER.warn("Process queue total cached messages memory exceeds
the threshold, threshold={} bytes," +
" actual={} bytes, mq={}, clientId={}",
- cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes,
mq, consumer.getClientId());
+ cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes,
mq, consumer.clientId());
return true;
}
return false;
@@ -384,7 +384,7 @@ class ProcessQueueImpl implements ProcessQueue {
}
private void ackMessage(MessageViewImpl messageView) {
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
@@ -450,7 +450,7 @@ class ProcessQueueImpl implements ProcessQueue {
int attempt = messageView.getDeliveryAttempt();
final MessageId messageId = messageView.getMessageId();
final ConsumeService service = consumer.getConsumeService();
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
if (ConsumeResult.FAILURE.equals(consumeResult) && attempt <
maxAttempts) {
final Duration nextAttemptDelay =
retryPolicy.getNextAttemptDelay(attempt);
attempt = messageView.incrementAndGetDeliveryAttempt();
@@ -482,7 +482,7 @@ class ProcessQueueImpl implements ProcessQueue {
final SettableFuture<Void> future0) {
final
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
future =
consumer.forwardMessageToDeadLetterQueue(messageView);
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
@@ -528,7 +528,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void forwardToDeadLetterQueueLater(final MessageViewImpl
messageView, final int attempt,
final SettableFuture<Void> future0) {
final MessageId messageId = messageView.getMessageId();
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
// Process queue is dropped, no need to proceed.
if (dropped) {
LOGGER.info("Process queue was dropped, give up to forward message
to dead letter queue, mq={}," +
@@ -558,7 +558,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void ackFifoMessage(final MessageViewImpl messageView, final int
attempt,
final SettableFuture<Void> future0) {
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
@@ -606,7 +606,7 @@ class ProcessQueueImpl implements ProcessQueue {
private void ackFifoMessageLater(final MessageViewImpl messageView, final
int attempt,
final SettableFuture<Void> future0) {
final MessageId messageId = messageView.getMessageId();
- final String clientId = consumer.getClientId();
+ final String clientId = consumer.clientId();
// Process queue is dropped, no need to proceed.
if (dropped) {
LOGGER.info("Process queue was dropped, give up to ack message,
mq={}, messageId={}, clientId={}",
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
new file mode 100644
index 0000000..8bdc019
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.producer;
+
+import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.TelemetryCommand;
+import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.route.Endpoints;
+
+public interface ClientSessionProcessor {
+ ListenableFuture<Void> register();
+
+ String clientId();
+
+ TelemetryCommand getSettingsCommand();
+
+ StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
StreamObserver<TelemetryCommand> observer)
+ throws ClientException;
+
+ void onSettingsCommand(Endpoints endpoints, Settings settings);
+
+ void onRecoverOrphanedTransactionCommand(Endpoints endpoints,
RecoverOrphanedTransactionCommand command);
+
+ void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand
command);
+
+ void onPrintThreadStackTraceCommand(Endpoints endpoints,
PrintThreadStackTraceCommand command);
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
index 74a442a..baa4662 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeter.java
@@ -39,7 +39,7 @@ public class ClientMeter {
private final Meter meter;
private final Endpoints endpoints;
private final SdkMeterProvider provider;
- private final ConcurrentMap<MetricName, DoubleHistogram> histogramMap;
+ private final ConcurrentMap<String /* histogram name */, DoubleHistogram>
histogramMap;
public ClientMeter(Meter meter, Endpoints endpoints, SdkMeterProvider
provider) {
this.enabled = true;
@@ -65,9 +65,9 @@ public class ClientMeter {
return endpoints;
}
- Optional<DoubleHistogram> getHistogramByName(MetricName metricName) {
- final DoubleHistogram histogram =
histogramMap.computeIfAbsent(metricName, name -> enabled ?
- meter.histogramBuilder(name.getName()).build() : null);
+ Optional<DoubleHistogram> getHistogramByEnum(HistogramEnum histogramEnum) {
+ final DoubleHistogram histogram =
histogramMap.computeIfAbsent(histogramEnum.getName(), name -> enabled ?
+ meter.histogramBuilder(histogramEnum.getName()).build() : null);
return null == histogram ? Optional.empty() : Optional.of(histogram);
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
index 9451028..16804b9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
@@ -33,6 +33,7 @@ import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.View;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
@@ -68,12 +69,12 @@ public class ClientMeterProvider {
this.messageCacheObserver = messageCacheObserver;
}
- Optional<DoubleHistogram> getHistogramByName(MetricName metricName) {
- return clientMeter.getHistogramByName(metricName);
+ Optional<DoubleHistogram> getHistogramByEnum(HistogramEnum histogramEnum) {
+ return clientMeter.getHistogramByEnum(histogramEnum);
}
public synchronized void reset(Metric metric) {
- final String clientId = client.getClientId();
+ final String clientId = client.clientId();
try {
if (clientMeter.satisfy(metric)) {
LOGGER.debug("Metric settings is satisfied by the current
message meter, clientId={}", clientId);
@@ -102,27 +103,29 @@ public class ClientMeterProvider {
.build();
InstrumentSelector sendSuccessCostTimeInstrumentSelector =
InstrumentSelector.builder()
-
.setType(InstrumentType.HISTOGRAM).setName(MetricName.SEND_SUCCESS_COST_TIME.getName()).build();
+
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.SEND_SUCCESS_COST_TIME.getName()).build();
final View sendSuccessCostTimeView = View.builder()
-
.setAggregation(HistogramBuckets.SEND_SUCCESS_COST_TIME_BUCKET).build();
+
.setAggregation(HistogramEnum.SEND_SUCCESS_COST_TIME.getBucket()).build();
InstrumentSelector deliveryLatencyInstrumentSelector =
InstrumentSelector.builder()
-
.setType(InstrumentType.HISTOGRAM).setName(MetricName.DELIVERY_LATENCY.getName()).build();
- final View deliveryLatencyView =
View.builder().setAggregation(HistogramBuckets.DELIVERY_LATENCY_BUCKET)
+
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.DELIVERY_LATENCY.getName()).build();
+ final View deliveryLatencyView =
View.builder().setAggregation(HistogramEnum.DELIVERY_LATENCY.getBucket())
.build();
InstrumentSelector awaitTimeInstrumentSelector =
InstrumentSelector.builder()
-
.setType(InstrumentType.HISTOGRAM).setName(MetricName.AWAIT_TIME.getName()).build();
- final View awaitTimeView =
View.builder().setAggregation(HistogramBuckets.AWAIT_TIME_BUCKET).build();
+
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.AWAIT_TIME.getName()).build();
+ final View awaitTimeView =
View.builder().setAggregation(HistogramEnum.AWAIT_TIME.getBucket()).build();
InstrumentSelector processTimeInstrumentSelector =
InstrumentSelector.builder()
-
.setType(InstrumentType.HISTOGRAM).setName(MetricName.PROCESS_TIME.getName()).build();
- final View processTimeView =
View.builder().setAggregation(HistogramBuckets.PROCESS_TIME_BUCKET).build();
+
.setType(InstrumentType.HISTOGRAM).setName(HistogramEnum.PROCESS_TIME.getName()).build();
+ final View processTimeView =
View.builder().setAggregation(HistogramEnum.PROCESS_TIME.getBucket()).build();
PeriodicMetricReader reader =
PeriodicMetricReader.builder(exporter)
.setInterval(METRIC_READER_INTERVAL).build();
- final SdkMeterProvider provider =
SdkMeterProvider.builder().registerMetricReader(reader)
+ final SdkMeterProvider provider = SdkMeterProvider.builder()
+ .setResource(Resource.empty())
+ .registerMetricReader(reader)
.registerView(sendSuccessCostTimeInstrumentSelector,
sendSuccessCostTimeView)
.registerView(deliveryLatencyInstrumentSelector,
deliveryLatencyView)
.registerView(awaitTimeInstrumentSelector, awaitTimeView)
@@ -143,7 +146,7 @@ public class ClientMeterProvider {
return;
}
final String consumerGroup = ((PushConsumer)
client).getConsumerGroup();
-
meter.gaugeBuilder(MetricName.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement
-> {
+
meter.gaugeBuilder(GaugeEnum.CONSUMER_CACHED_MESSAGES.getName()).buildWithCallback(measurement
-> {
final Map<String, Long> cachedMessageCountMap =
messageCacheObserver.getCachedMessageCount();
for (Map.Entry<String, Long> entry :
cachedMessageCountMap.entrySet()) {
final String topic = entry.getKey();
@@ -154,7 +157,7 @@ public class ClientMeterProvider {
measurement.record(entry.getValue(), attributes);
}
});
-
meter.gaugeBuilder(MetricName.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement
-> {
+
meter.gaugeBuilder(GaugeEnum.CONSUMER_CACHED_BYTES.getName()).buildWithCallback(measurement
-> {
final Map<String, Long> cachedMessageBytesMap =
messageCacheObserver.getCachedMessageBytes();
for (Map.Entry<String, Long> entry :
cachedMessageBytesMap.entrySet()) {
final String topic = entry.getKey();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeEnum.java
similarity index 54%
copy from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
copy to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeEnum.java
index caf2ef2..93d2834 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/GaugeEnum.java
@@ -17,13 +17,7 @@
package org.apache.rocketmq.client.java.metrics;
-public enum MetricName {
- /**
- * A histogram that records the cost time of successful api calls of
message publishing.
- *
- * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#INVOCATION_STATUS}.
- */
- SEND_SUCCESS_COST_TIME("rocketmq_send_cost_time"),
+public enum GaugeEnum {
/**
* A gauge that records the cached message count of push consumer.
*
@@ -35,30 +29,11 @@ public enum MetricName {
*
* <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP}.
*/
- CONSUMER_CACHED_BYTES("rocketmq_consumer_cached_bytes"),
- /**
- * A histogram that records the latency of message delivery from remote.
- *
- * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP}.
- */
- DELIVERY_LATENCY("rocketmq_delivery_latency"),
- /**
- * A histogram that records await time of message consumption.
- *
- * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP}.
- */
- AWAIT_TIME("rocketmq_await_time"),
- /**
- * A histogram that records the process time of message consumption.
- *
- * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP},
- * {@link MetricLabels#INVOCATION_STATUS}.
- */
- PROCESS_TIME("rocketmq_process_time");
+ CONSUMER_CACHED_BYTES("rocketmq_consumer_cached_bytes");
private final String name;
- MetricName(String name) {
+ GaugeEnum(String name) {
this.name = name;
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java
deleted file mode 100644
index b09315f..0000000
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramBuckets.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.metrics;
-
-import io.opentelemetry.sdk.metrics.Aggregation;
-import java.util.Arrays;
-
-public class HistogramBuckets {
- /**
- * Histogram bucket for {@link MetricName#SEND_SUCCESS_COST_TIME}, time
unit is milliseconds.
- */
- public static final Aggregation SEND_SUCCESS_COST_TIME_BUCKET =
- Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0, 10.0,
20.0, 50.0, 200.0, 500.0));
- /**
- * Histogram bucket for {@link MetricName#DELIVERY_LATENCY}, time unit is
milliseconds.
- */
- public static final Aggregation DELIVERY_LATENCY_BUCKET =
Aggregation.explicitBucketHistogram(Arrays.asList(1.0,
- 5.0, 10.0, 20.0, 50.0, 200.0, 500.0));
- /**
- * Histogram bucket for {@link MetricName#AWAIT_TIME}, time unit is
milliseconds.
- */
- public static final Aggregation AWAIT_TIME_BUCKET =
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
- 20.0, 100.0, 1000.0, 5 * 1000.0, 10 * 1000.0));
- /**
- * Histogram bucket for {@link MetricName#PROCESS_TIME}, time unit is
milliseconds.
- */
- public static final Aggregation PROCESS_TIME_BUCKET =
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
- 10.0, 100.0, 1000.0, 10 * 1000.0, 60 * 1000.0));
-
- private HistogramBuckets() {
- }
-}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
similarity index 62%
rename from
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
rename to
java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
index caf2ef2..0fd1d07 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MetricName.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/HistogramEnum.java
@@ -17,52 +17,63 @@
package org.apache.rocketmq.client.java.metrics;
-public enum MetricName {
+import io.opentelemetry.sdk.metrics.Aggregation;
+import java.util.Arrays;
+
+public enum HistogramEnum {
/**
* A histogram that records the cost time of successful api calls of
message publishing.
*
* <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#INVOCATION_STATUS}.
- */
- SEND_SUCCESS_COST_TIME("rocketmq_send_cost_time"),
- /**
- * A gauge that records the cached message count of push consumer.
- *
- * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP}.
- */
- CONSUMER_CACHED_MESSAGES("rocketmq_consumer_cached_messages"),
- /**
- * A gauge that records the cached message bytes of push consumer.
*
- * <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP}.
+ * <p>The time unit of bucket is milliseconds.
*/
- CONSUMER_CACHED_BYTES("rocketmq_consumer_cached_bytes"),
+ SEND_SUCCESS_COST_TIME("rocketmq_send_cost_time",
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+ 10.0, 20.0, 50.0, 200.0, 500.0))),
+
/**
* A histogram that records the latency of message delivery from remote.
*
* <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP}.
+ *
+ * <p>The time unit of bucket is milliseconds.
*/
- DELIVERY_LATENCY("rocketmq_delivery_latency"),
+ DELIVERY_LATENCY("rocketmq_delivery_latency",
Aggregation.explicitBucketHistogram(Arrays.asList(1.0,
+ 5.0, 10.0, 20.0, 50.0, 200.0, 500.0))),
+
/**
* A histogram that records await time of message consumption.
*
* <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP}.
+ *
+ * <p>The time unit of bucket is milliseconds.
*/
- AWAIT_TIME("rocketmq_await_time"),
+ AWAIT_TIME("rocketmq_await_time",
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+ 20.0, 100.0, 1000.0, 5 * 1000.0, 10 * 1000.0))),
/**
* A histogram that records the process time of message consumption.
*
* <p>Labels: {@link MetricLabels#TOPIC}, {@link MetricLabels#CLIENT_ID},
{@link MetricLabels#CONSUMER_GROUP},
* {@link MetricLabels#INVOCATION_STATUS}.
+ *
+ * <p>The time unit of bucket is milliseconds.
*/
- PROCESS_TIME("rocketmq_process_time");
+ PROCESS_TIME("rocketmq_process_time",
Aggregation.explicitBucketHistogram(Arrays.asList(1.0, 5.0,
+ 10.0, 100.0, 1000.0, 10 * 1000.0, 60 * 1000.0)));
private final String name;
+ private final Aggregation bucket;
- MetricName(String name) {
+ HistogramEnum(String name, Aggregation bucket) {
this.name = name;
+ this.bucket = bucket;
}
public String getName() {
return name;
}
+
+ public Aggregation getBucket() {
+ return bucket;
+ }
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
index d9bd316..3039f31 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
@@ -46,7 +46,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
private void doAfterSendMessage(List<MessageCommon> messageCommons,
Duration duration,
MessageHookPointsStatus status) {
final Optional<DoubleHistogram> optionalHistogram =
-
clientMeterProvider.getHistogramByName(MetricName.SEND_SUCCESS_COST_TIME);
+
clientMeterProvider.getHistogramByEnum(HistogramEnum.SEND_SUCCESS_COST_TIME);
if (!optionalHistogram.isPresent()) {
return;
}
@@ -55,7 +55,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
InvocationStatus invocationStatus =
MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
InvocationStatus.FAILURE;
Attributes attributes =
Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
- .put(MetricLabels.CLIENT_ID,
clientMeterProvider.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID,
clientMeterProvider.getClient().clientId())
.put(MetricLabels.INVOCATION_STATUS,
invocationStatus.getName()).build();
histogram.record(duration.toMillis(), attributes);
}
@@ -74,7 +74,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
consumerGroup = ((SimpleConsumer) client).getConsumerGroup();
}
if (null == consumerGroup) {
- LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}",
client.getClientId());
+ LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}",
client.clientId());
return;
}
final MessageCommon messageCommon = messageCommons.iterator().next();
@@ -85,14 +85,14 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
final Timestamp deliveryTimestampFromRemote =
optionalDeliveryTimestampFromRemote.get();
final long latency = System.currentTimeMillis() -
Timestamps.toMillis(deliveryTimestampFromRemote);
final Optional<DoubleHistogram> optionalHistogram =
-
clientMeterProvider.getHistogramByName(MetricName.DELIVERY_LATENCY);
+
clientMeterProvider.getHistogramByEnum(HistogramEnum.DELIVERY_LATENCY);
if (!optionalHistogram.isPresent()) {
return;
}
final DoubleHistogram histogram = optionalHistogram.get();
final Attributes attributes =
Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+ .put(MetricLabels.CLIENT_ID, client.clientId()).build();
histogram.record(latency, attributes);
}
@@ -103,7 +103,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
consumerGroup = ((PushConsumer) client).getConsumerGroup();
}
if (null == consumerGroup) {
- LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}",
client.getClientId());
+ LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}",
client.clientId());
return;
}
final MessageCommon messageCommon = messageCommons.iterator().next();
@@ -114,9 +114,9 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
final Duration durationAfterDecoding =
optionalDurationAfterDecoding.get();
Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC,
messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP, consumerGroup)
- .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+ .put(MetricLabels.CLIENT_ID, client.clientId()).build();
final Optional<DoubleHistogram> optionalHistogram =
- clientMeterProvider.getHistogramByName(MetricName.AWAIT_TIME);
+ clientMeterProvider.getHistogramByEnum(HistogramEnum.AWAIT_TIME);
if (!optionalHistogram.isPresent()) {
return;
}
@@ -129,7 +129,7 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
final ClientImpl client = clientMeterProvider.getClient();
if (!(client instanceof PushConsumer)) {
// Should never reach here.
- LOGGER.error("[Bug] current client is not push consumer,
clientId={}", client.getClientId());
+ LOGGER.error("[Bug] current client is not push consumer,
clientId={}", client.clientId());
return;
}
PushConsumer pushConsumer = (PushConsumer) client;
@@ -138,11 +138,11 @@ public class MessageMeterInterceptor implements
MessageInterceptor {
InvocationStatus.FAILURE;
Attributes attributes =
Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
.put(MetricLabels.CONSUMER_GROUP,
pushConsumer.getConsumerGroup())
- .put(MetricLabels.CLIENT_ID,
clientMeterProvider.getClient().getClientId())
+ .put(MetricLabels.CLIENT_ID,
clientMeterProvider.getClient().clientId())
.put(MetricLabels.INVOCATION_STATUS,
invocationStatus.getName())
.build();
final Optional<DoubleHistogram> optionalHistogram =
-
clientMeterProvider.getHistogramByName(MetricName.PROCESS_TIME);
+
clientMeterProvider.getHistogramByEnum(HistogramEnum.PROCESS_TIME);
if (!optionalHistogram.isPresent()) {
return;
}
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index a7b5357..8b88702 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -46,7 +46,7 @@ import
org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.tool.TestBase;
@@ -87,7 +87,7 @@ public class PushConsumerImplTest extends TestBase {
any(Duration.class)))
.thenReturn(okQueryRouteResponseFuture());
when(clientManager.telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class)))
+ any(ClientSessionImpl.class)))
.thenReturn(telemetryRequestObserver);
final ScheduledThreadPoolExecutor scheduler = new
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
"TestScheduler"));
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 88445d1..e6ede4e 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -59,7 +59,7 @@ import
org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -105,7 +105,7 @@ public class SimpleConsumerImplTest extends TestBase {
any(Duration.class)))
.thenReturn(future0);
when(clientManager.telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class)))
+ any(ClientSessionImpl.class)))
.thenReturn(telemetryRequestObserver);
final ScheduledThreadPoolExecutor scheduler = new
ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("TestScheduler"));
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 619538e..f069b52 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -58,7 +58,7 @@ import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.rpc.InvocationContext;
@@ -112,7 +112,7 @@ public class ProducerImplTest extends TestBase {
any(Duration.class)))
.thenReturn(future0);
when(clientManager.telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class)))
+ any(ClientSessionImpl.class)))
.thenReturn(telemetryRequestObserver);
final ScheduledThreadPoolExecutor scheduler = new
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
"TestScheduler"));
@@ -145,7 +145,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, times(1)).queryRoute(any(Endpoints.class),
any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class),
- any(Duration.class), any(TelemetrySession.class));
+ any(Duration.class), any(ClientSessionImpl.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
final ListenableFuture<InvocationContext<SendMessageResponse>> future =
okSendMessageResponseFutureWithSingleEntry();
@@ -165,7 +165,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, never()).queryRoute(any(Endpoints.class),
any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, never()).telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class));
+ any(ClientSessionImpl.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
final ListenableFuture<InvocationContext<SendMessageResponse>> future =
okSendMessageResponseFutureWithSingleEntry();
@@ -177,7 +177,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, times(1)).queryRoute(any(Endpoints.class),
any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class),
- any(Duration.class), any(TelemetrySession.class));
+ any(Duration.class), any(ClientSessionImpl.class));
final apache.rocketmq.v2.SendResultEntry receipt =
response.getEntriesList().iterator().next();
assertEquals(receipt.getMessageId(),
sendReceipt.getMessageId().toString());
shutdown(producerWithoutTopicBinding);
@@ -189,7 +189,7 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, times(1)).queryRoute(any(Endpoints.class),
any(Metadata.class),
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
- any(TelemetrySession.class));
+ any(ClientSessionImpl.class));
final ListenableFuture<InvocationContext<SendMessageResponse>> future
= failureSendMessageResponseFuture();
when(clientManager.sendMessage(any(Endpoints.class),
any(Metadata.class), any(SendMessageRequest.class),
any(Duration.class))).thenReturn(future);