This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit ac1f5dc1843b0be6a3fb9b36bbf3151ec50f2bdd
Author: Aaron Ai <yangkun....@gmail.com>
AuthorDate: Sat Jul 16 11:32:24 2022 +0800

    WIP
---
 .../rocketmq/client/java/impl/ClientImpl.java      |   6 +-
 .../rocketmq/client/java/impl/ClientManager.java   |  71 ++++-----
 .../client/java/impl/ClientManagerImpl.java        |  63 +-------
 .../client/java/impl/ClientManagerRegistry.java    |  95 ------------
 .../rocketmq/client/java/rpc/RpcClientImpl.java    |   1 -
 .../client/java/impl/ClientManagerImplTest.java    |   2 +-
 .../java/impl/consumer/PushConsumerImplTest.java   | 101 -------------
 .../java/impl/consumer/SimpleConsumerImplTest.java | 163 ---------------------
 .../java/impl/producer/ProducerImplTest.java       |   4 -
 java/pom.xml                                       |   2 +-
 10 files changed, 37 insertions(+), 471 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 044b6aa..38d5001 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -145,6 +145,8 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         this.messageInterceptors = new ArrayList<>();
         this.messageInterceptorsLock = new ReentrantReadWriteLock();
 
+        this.clientManager = new ClientManagerImpl(this);
+
         this.clientCallbackExecutor = new ThreadPoolExecutor(
             Runtime.getRuntime().availableProcessors(),
             Runtime.getRuntime().availableProcessors(),
@@ -174,8 +176,6 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
     @Override
     protected void startUp() throws Exception {
         LOGGER.info("Begin to start the rocketmq client, clientId={}", 
clientId);
-        // Register client after client id generation.
-        this.clientManager = 
ClientManagerRegistry.getInstance().registerClient(this);
         // Fetch topic route from remote.
         LOGGER.info("Begin to fetch topic(s) route data from remote during 
client startup, clientId={}, topics={}",
             clientId, topics);
@@ -228,7 +228,7 @@ public abstract class ClientImpl extends 
AbstractIdleService implements Client,
         LOGGER.info("Begin to release telemetry sessions, clientId={}", 
clientId);
         releaseTelemetrySessions();
         LOGGER.info("Release telemetry sessions successfully, clientId={}", 
clientId);
-        ClientManagerRegistry.getInstance().unregisterClient(this);
+        clientManager.stopAsync().awaitTerminated();
         clientCallbackExecutor.shutdown();
         if (!ExecutorServices.awaitTerminated(clientCallbackExecutor)) {
             LOGGER.error("[Bug] Timeout to shutdown the client callback 
executor, clientId={}", clientId);
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index b61e315..a0d6139 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -38,6 +38,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
@@ -55,34 +56,13 @@ import 
org.apache.rocketmq.client.java.rpc.InvocationContext;
  * once {@link Client} is shut down, it must be unregistered by the client 
manager. The client manager holds the
  * connections and underlying threads, which are shared by all registered 
clients.
  */
-public interface ClientManager {
-    /**
-     * Register client.
-     *
-     * @param client client.
-     */
-    void registerClient(Client client);
-
-    /**
-     * Unregister client.
-     *
-     * @param client client.
-     */
-    void unregisterClient(Client client);
-
-    /**
-     * Returns {@code true} if manager contains no {@link Client}.
-     *
-     * @return {@code true} if this map contains no {@link Client}.
-     */
-    boolean isEmpty();
-
+public abstract class ClientManager extends AbstractIdleService {
     /**
      * Provide for the client to share the scheduler.
      *
      * @return shared scheduler.
      */
-    ScheduledExecutorService getScheduler();
+    public abstract ScheduledExecutorService getScheduler();
 
     /**
      * Query topic route asynchronously, the method ensures no throwable.
@@ -93,9 +73,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<QueryRouteResponse>> 
queryRoute(Endpoints endpoints, Metadata metadata,
-        QueryRouteRequest request,
-        Duration duration);
+    public abstract ListenableFuture<InvocationContext<QueryRouteResponse>> 
queryRoute(Endpoints endpoints,
+        Metadata metadata, QueryRouteRequest request, Duration duration);
 
     /**
      * Heart beat asynchronously, the method ensures no throwable.
@@ -106,9 +85,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints 
endpoints, Metadata metadata,
-        HeartbeatRequest request,
-        Duration duration);
+    public abstract ListenableFuture<InvocationContext<HeartbeatResponse>> 
heartbeat(Endpoints endpoints,
+        Metadata metadata, HeartbeatRequest request, Duration duration);
 
     /**
      * Send message asynchronously, the method ensures no throwable.
@@ -119,8 +97,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<SendMessageResponse>> 
sendMessage(Endpoints endpoints, Metadata metadata,
-        SendMessageRequest request, Duration duration);
+    public abstract ListenableFuture<InvocationContext<SendMessageResponse>> 
sendMessage(Endpoints endpoints,
+        Metadata metadata, SendMessageRequest request, Duration duration);
 
     /**
      * Query assignment asynchronously, the method ensures no throwable.
@@ -131,8 +109,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
queryAssignment(Endpoints endpoints, Metadata metadata,
-        QueryAssignmentRequest request, Duration duration);
+    public abstract 
ListenableFuture<InvocationContext<QueryAssignmentResponse>> 
queryAssignment(Endpoints endpoints,
+        Metadata metadata, QueryAssignmentRequest request, Duration duration);
 
     /**
      * Receiving messages asynchronously from the server, the method ensures 
no throwable.
@@ -141,8 +119,8 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> 
receiveMessage(Endpoints endpoints,
-        Metadata metadata, ReceiveMessageRequest request, Duration duration);
+    public abstract 
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> 
receiveMessage(
+        Endpoints endpoints, Metadata metadata, ReceiveMessageRequest request, 
Duration duration);
 
     /**
      * Ack message asynchronously after the success of consumption, the method 
ensures no throwable.
@@ -153,8 +131,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<AckMessageResponse>> 
ackMessage(Endpoints endpoints, Metadata metadata,
-        AckMessageRequest request, Duration duration);
+    public abstract ListenableFuture<InvocationContext<AckMessageResponse>> 
ackMessage(Endpoints endpoints,
+        Metadata metadata, AckMessageRequest request, Duration duration);
 
     /**
      * Nack message asynchronously after the failure of consumption, the 
method ensures no throwable.
@@ -165,8 +143,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(Endpoints endpoints,
-        Metadata metadata, ChangeInvisibleDurationRequest request, Duration 
duration);
+    public abstract 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> 
changeInvisibleDuration(
+        Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest 
request, Duration duration);
 
     /**
      * Send a message to the dead letter queue asynchronously, the method 
ensures no throwable.
@@ -177,8 +155,9 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> 
forwardMessageToDeadLetterQueue(
-        Endpoints endpoints, Metadata metadata, 
ForwardMessageToDeadLetterQueueRequest request, Duration duration);
+    public abstract 
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
+    forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata,
+        ForwardMessageToDeadLetterQueueRequest request, Duration duration);
 
     /**
      * Submit transaction resolution asynchronously, the method ensures no 
throwable.
@@ -189,8 +168,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<EndTransactionResponse>> 
endTransaction(Endpoints endpoints, Metadata metadata,
-        EndTransactionRequest request, Duration duration);
+    public abstract 
ListenableFuture<InvocationContext<EndTransactionResponse>> 
endTransaction(Endpoints endpoints,
+        Metadata metadata, EndTransactionRequest request, Duration duration);
 
     /**
      * Asynchronously notify the server that client is terminated, the method 
ensures no throwable.
@@ -202,8 +181,8 @@ public interface ClientManager {
      * @return response future of notification of client termination.
      */
     @SuppressWarnings("UnusedReturnValue")
-    ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> 
notifyClientTermination(Endpoints endpoints,
-        Metadata metadata, NotifyClientTerminationRequest request, Duration 
duration);
+    public abstract 
ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> 
notifyClientTermination(
+        Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest 
request, Duration duration);
 
     /**
      * Establish telemetry session stream to server.
@@ -215,6 +194,6 @@ public interface ClientManager {
      * @return request observer.
      * @throws ClientException if failed to establish telemetry session stream.
      */
-    StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata 
metadata,
+    public abstract StreamObserver<TelemetryCommand> telemetry(Endpoints 
endpoints, Metadata metadata,
         Duration duration, StreamObserver<TelemetryCommand> responseObserver) 
throws ClientException;
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index b564634..fcfc05c 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -38,7 +38,6 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -49,8 +48,6 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -62,7 +59,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
-import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.InvocationContext;
@@ -74,7 +70,7 @@ import org.slf4j.LoggerFactory;
 /**
  * @see ClientManager
  */
-public class ClientManagerImpl extends AbstractIdleService implements 
ClientManager {
+public class ClientManagerImpl extends ClientManager {
     public static final Duration RPC_CLIENT_MAX_IDLE_DURATION = 
Duration.ofMinutes(30);
 
     public static final Duration RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY = 
Duration.ofSeconds(5);
@@ -91,15 +87,11 @@ public class ClientManagerImpl extends AbstractIdleService 
implements ClientMana
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ClientManagerImpl.class);
 
+    private final Client client;
     @GuardedBy("rpcClientTableLock")
     private final Map<Endpoints, RpcClient> rpcClientTable;
     private final ReadWriteLock rpcClientTableLock;
 
-    /**
-     * Contains all client, key is {@link ClientImpl#clientId}.
-     */
-    private final ConcurrentMap<String, Client> clientTable;
-
     /**
      * In charge of all scheduled tasks.
      */
@@ -110,12 +102,10 @@ public class ClientManagerImpl extends 
AbstractIdleService implements ClientMana
      */
     private final ExecutorService asyncWorker;
 
-    public ClientManagerImpl() {
+    public ClientManagerImpl(Client client) {
+        this.client = client;
         this.rpcClientTable = new HashMap<>();
         this.rpcClientTableLock = new ReentrantReadWriteLock();
-
-        this.clientTable = new ConcurrentHashMap<>();
-
         this.scheduler = new ScheduledThreadPoolExecutor(
             Runtime.getRuntime().availableProcessors(),
             new ThreadFactoryImpl("ClientScheduler"));
@@ -129,21 +119,6 @@ public class ClientManagerImpl extends AbstractIdleService 
implements ClientMana
             new ThreadFactoryImpl("ClientAsyncWorker"));
     }
 
-    @Override
-    public void registerClient(Client client) {
-        clientTable.put(client.clientId(), client);
-    }
-
-    @Override
-    public void unregisterClient(Client client) {
-        clientTable.remove(client.clientId());
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return clientTable.isEmpty();
-    }
-
     /**
      * It is well-founded that a {@link RpcClient} is deprecated if it is idle 
for a long time, so it is essential to
      * clear it.
@@ -172,30 +147,6 @@ public class ClientManagerImpl extends AbstractIdleService 
implements ClientMana
         }
     }
 
-    private void doHeartbeat() {
-        for (Client client : clientTable.values()) {
-            client.doHeartbeat();
-        }
-    }
-
-    private void doStats() {
-        LOGGER.info("Start to log stats for a new round, clientVersion={}, 
clientWrapperVersion={}",
-            MetadataUtils.getVersion(), MetadataUtils.getWrapperVersion());
-        for (Client client : clientTable.values()) {
-            client.doStats();
-        }
-    }
-
-    private void syncSettings() {
-        clientTable.values().forEach(client -> {
-            try {
-                client.syncSettings();
-            } catch (Throwable t) {
-                LOGGER.error("Failed to announce settings, clientId={}", 
client.clientId(), t);
-            }
-        });
-    }
-
     /**
      * Return the RPC client by remote {@link Endpoints}, would create the 
client automatically if it does not exist.
      *
@@ -376,7 +327,7 @@ public class ClientManagerImpl extends AbstractIdleService 
implements ClientMana
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
-                    doHeartbeat();
+                    client.doHeartbeat();
                 } catch (Throwable t) {
                     LOGGER.error("Exception raised while heartbeat.", t);
                 }
@@ -389,7 +340,7 @@ public class ClientManagerImpl extends AbstractIdleService 
implements ClientMana
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
-                    doStats();
+                    client.doStats();
                 } catch (Throwable t) {
                     LOGGER.error("Exception raised while log stats.", t);
                 }
@@ -402,7 +353,7 @@ public class ClientManagerImpl extends AbstractIdleService 
implements ClientMana
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
-                    syncSettings();
+                    client.syncSettings();
                 } catch (Throwable t) {
                     LOGGER.error("Exception raised during the setting 
announcement.", t);
                 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
deleted file mode 100644
index 80299da..0000000
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.impl;
-
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.concurrent.ThreadSafe;
-
-@ThreadSafe
-public class ClientManagerRegistry {
-    private static final ClientManagerRegistry INSTANCE = new 
ClientManagerRegistry();
-
-    @GuardedBy("clientIdsLock")
-    private final Set<String> clientIds = new HashSet<>();
-    private final Lock clientIdsLock = new ReentrantLock();
-
-    private volatile ClientManagerImpl singletonClientManager = null;
-
-    private ClientManagerRegistry() {
-    }
-
-    public static ClientManagerRegistry getInstance() {
-        return INSTANCE;
-    }
-
-    /**
-     * Register {@link Client} to the appointed manager by manager id, start 
the manager if it is created newly.
-     *
-     * <p>Different clients would share the same {@link ClientManager} if they 
have the same manager id.
-     *
-     * @param client the client to register.
-     * @return the client manager which is started.
-     */
-    public ClientManager registerClient(Client client) {
-        clientIdsLock.lock();
-        try {
-            if (null == singletonClientManager) {
-                final ClientManagerImpl clientManager = new 
ClientManagerImpl();
-                clientManager.startAsync().awaitRunning();
-                singletonClientManager = clientManager;
-            }
-            clientIds.add(client.clientId());
-            singletonClientManager.registerClient(client);
-            return singletonClientManager;
-        } finally {
-            clientIdsLock.unlock();
-        }
-    }
-
-    /**
-     * Unregister {@link Client} to the appointed manager by message-id, 
shutdown the manager if no client
-     * registered in it.
-     *
-     * @param client client to unregister.
-     * @return {@link ClientManager} is removed or not.
-     */
-    @SuppressWarnings("UnusedReturnValue")
-    public boolean unregisterClient(Client client) {
-        ClientManagerImpl clientManager = null;
-        clientIdsLock.lock();
-        try {
-            clientIds.remove(client.clientId());
-            singletonClientManager.unregisterClient(client);
-            if (clientIds.isEmpty()) {
-                clientManager = singletonClientManager;
-                singletonClientManager = null;
-            }
-        } finally {
-            clientIdsLock.unlock();
-        }
-        // No need to hold the lock here.
-        if (null != clientManager) {
-            clientManager.stopAsync().awaitTerminated();
-        }
-        return null != clientManager;
-    }
-}
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index ffba65e..13c0fe5 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -91,7 +91,6 @@ public class RpcClientImpl implements RpcClient {
                 .intercept(LoggingInterceptor.getInstance())
                 .sslContext(sslContext);
         // Disable grpc's auto-retry here.
-        channelBuilder.disableRetry();
 
         final List<InetSocketAddress> socketAddresses = 
endpoints.toSocketAddresses();
         if (null != socketAddresses) {
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index f0b7f3e..270f09b 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -33,7 +33,7 @@ import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 
 public class ClientManagerImplTest extends TestBase {
-    private final ClientManagerImpl clientManager = new ClientManagerImpl();
+    private final ClientManagerImpl clientManager = new 
ClientManagerImpl(null);
 
     @Test
     public void testQueryRoute() {
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index 8b88702..6ad733b 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -17,58 +17,19 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import apache.rocketmq.v2.CustomizedBackoff;
-import apache.rocketmq.v2.QueryAssignmentRequest;
-import apache.rocketmq.v2.QueryRouteRequest;
-import apache.rocketmq.v2.RetryPolicy;
-import apache.rocketmq.v2.Settings;
-import apache.rocketmq.v2.Subscription;
-import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.Service;
-import com.google.protobuf.util.Durations;
-import io.grpc.Metadata;
-import io.grpc.stub.StreamObserver;
-import java.time.Duration;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
-import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class PushConsumerImplTest extends TestBase {
-    @Mock
-    private ClientManagerImpl clientManager;
-
-    @Mock
-    private StreamObserver<TelemetryCommand> telemetryRequestObserver;
-
-    @SuppressWarnings("unused")
-    @InjectMocks
-    private ClientManagerRegistry clientManagerRegistry = 
ClientManagerRegistry.getInstance();
-
     private final Map<String, FilterExpression> subscriptionExpressions = 
createSubscriptionExpressions(FAKE_TOPIC_0);
 
     private final MessageListener messageListener = messageView -> 
ConsumeResult.SUCCESS;
@@ -82,57 +43,6 @@ public class PushConsumerImplTest extends TestBase {
 
     private PushConsumerImpl pushConsumer;
 
-    private void start(PushConsumerImpl pushConsumer) throws ClientException {
-        when(clientManager.queryRoute(any(Endpoints.class), 
any(Metadata.class), any(QueryRouteRequest.class),
-            any(Duration.class)))
-            .thenReturn(okQueryRouteResponseFuture());
-        when(clientManager.telemetry(any(Endpoints.class), 
any(Metadata.class), any(Duration.class),
-            any(ClientSessionImpl.class)))
-            .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new 
ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
-            "TestScheduler"));
-        when(clientManager.getScheduler()).thenReturn(scheduler);
-        
doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
-
-        CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
-            
.addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
-        RetryPolicy retryPolicy = 
RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff)
-            .build();
-        Subscription subscription = Subscription.newBuilder().build();
-        Settings settings = 
Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
-        final Service service = pushConsumer.startAsync();
-        pushConsumer.getPushConsumerSettings().applySettingsCommand(settings);
-        service.awaitRunning();
-    }
-
-    private void shutdown(PushConsumerImpl pushConsumer) {
-        final Service clientManagerService = mock(Service.class);
-        when(clientManager.stopAsync()).thenReturn(clientManagerService);
-        doNothing().when(clientManagerService).awaitTerminated();
-        pushConsumer.stopAsync().awaitTerminated();
-    }
-
-    @Test
-    public void testScanAssignment() throws ExecutionException, 
InterruptedException, ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, 
subscriptionExpressions, messageListener,
-            maxCacheMessageCount, maxCacheMessageSizeInBytes, 
consumptionThreadCount);
-        start(pushConsumer);
-        when(clientManager.queryAssignment(any(Endpoints.class), 
any(Metadata.class), any(QueryAssignmentRequest.class),
-            
any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
-        pushConsumer.scanAssignments();
-        verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), 
any(Metadata.class),
-            any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
-            any(Metadata.class), any(QueryAssignmentRequest.class), 
any(Duration.class));
-        
Assert.assertEquals(okQueryAssignmentResponseFuture().get().getResp().getAssignmentsCount(),
-            pushConsumer.getQueueSize());
-        when(clientManager.queryAssignment(any(Endpoints.class), 
any(Metadata.class), any(QueryAssignmentRequest.class),
-            
any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
-        pushConsumer.scanAssignments();
-        Assert.assertEquals(0, pushConsumer.getQueueSize());
-        shutdown(pushConsumer);
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testSubscribeWithoutStart() throws ClientException {
         pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, 
subscriptionExpressions, messageListener,
@@ -146,15 +56,4 @@ public class PushConsumerImplTest extends TestBase {
             maxCacheMessageCount, maxCacheMessageSizeInBytes, 
consumptionThreadCount);
         pushConsumer.unsubscribe(FAKE_TOPIC_0);
     }
-
-    @Test
-    public void testSubscribeWithSubscriptionOverwriting() throws 
ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, 
subscriptionExpressions,
-            messageListener,
-            maxCacheMessageCount, maxCacheMessageSizeInBytes, 
consumptionThreadCount);
-        start(pushConsumer);
-        final FilterExpression filterExpression = new 
FilterExpression(FAKE_TAG_0);
-        pushConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
-        shutdown(pushConsumer);
-    }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index e6ede4e..6659536 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -17,68 +17,20 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import apache.rocketmq.v2.AckMessageRequest;
-import apache.rocketmq.v2.AckMessageResponse;
-import apache.rocketmq.v2.Broker;
-import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
-import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
-import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.MessageQueue;
-import apache.rocketmq.v2.Permission;
-import apache.rocketmq.v2.QueryRouteRequest;
-import apache.rocketmq.v2.QueryRouteResponse;
-import apache.rocketmq.v2.ReceiveMessageRequest;
-import apache.rocketmq.v2.ReceiveMessageResponse;
-import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.Settings;
-import apache.rocketmq.v2.Status;
-import apache.rocketmq.v2.Subscription;
-import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Metadata;
-import io.grpc.stub.StreamObserver;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
-import org.apache.rocketmq.client.apis.message.MessageView;
-import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
-import org.apache.rocketmq.client.java.message.MessageViewImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
-import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class SimpleConsumerImplTest extends TestBase {
-    @Mock
-    private ClientManagerImpl clientManager;
-    @Mock
-    private StreamObserver<TelemetryCommand> telemetryRequestObserver;
     @InjectMocks
-    private ClientManagerRegistry clientManagerRegistry = 
ClientManagerRegistry.getInstance();
     private final ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
         .setEndpoints(FAKE_ACCESS_POINT).build();
 
@@ -87,45 +39,6 @@ public class SimpleConsumerImplTest extends TestBase {
 
     private SimpleConsumerImpl simpleConsumer;
 
-    private void start(SimpleConsumerImpl simpleConsumer) throws 
ClientException {
-        SettableFuture<InvocationContext<QueryRouteResponse>> future0 = 
SettableFuture.create();
-        Status status = Status.newBuilder().setCode(Code.OK).build();
-        List<MessageQueue> messageQueueList = new ArrayList<>();
-        MessageQueue mq = 
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
-            .setPermission(Permission.READ_WRITE)
-            
.setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0)
-            .build();
-        messageQueueList.add(mq);
-        QueryRouteResponse response = 
QueryRouteResponse.newBuilder().setStatus(status)
-            .addAllMessageQueues(messageQueueList).build();
-        final InvocationContext<QueryRouteResponse> invocationContext = new 
InvocationContext<>(response,
-            fakeRpcContext());
-        future0.set(invocationContext);
-        when(clientManager.queryRoute(any(Endpoints.class), 
any(Metadata.class), any(QueryRouteRequest.class),
-            any(Duration.class)))
-            .thenReturn(future0);
-        when(clientManager.telemetry(any(Endpoints.class), 
any(Metadata.class), any(Duration.class),
-            any(ClientSessionImpl.class)))
-            .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new 
ScheduledThreadPoolExecutor(1,
-            new ThreadFactoryImpl("TestScheduler"));
-        when(clientManager.getScheduler()).thenReturn(scheduler);
-        
doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
-
-        Subscription subscription = Subscription.newBuilder().build();
-        Settings settings = 
Settings.newBuilder().setSubscription(subscription).build();
-        final Service service = simpleConsumer.startAsync();
-        
simpleConsumer.getSimpleConsumerSettings().applySettingsCommand(settings);
-        service.awaitRunning();
-    }
-
-    private void shutdown(SimpleConsumerImpl simpleConsumer) {
-        final Service clientManagerService = mock(Service.class);
-        when(clientManager.stopAsync()).thenReturn(clientManagerService);
-        doNothing().when(clientManagerService).awaitTerminated();
-        simpleConsumer.stopAsync().awaitTerminated();
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testReceiveWithoutStart() throws ClientException {
         simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
@@ -149,80 +62,4 @@ public class SimpleConsumerImplTest extends TestBase {
         simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
         simpleConsumer.unsubscribe(FAKE_TOPIC_0);
     }
-
-    @Test
-    public void testStartAndShutdown() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        shutdown(simpleConsumer);
-    }
-
-    @Test
-    public void testSubscribeWithSubscriptionOverwriting() throws 
ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        final FilterExpression filterExpression = new 
FilterExpression(FAKE_TAG_0);
-        simpleConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
-        shutdown(simpleConsumer);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testReceiveWithAllTopicsAreUnsubscribed() throws 
ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        simpleConsumer.unsubscribe(FAKE_TOPIC_0);
-        try {
-            simpleConsumer.receive(1, Duration.ofSeconds(1));
-        } finally {
-            shutdown(simpleConsumer);
-        }
-    }
-
-    @Test
-    public void testReceiveMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        int receivedMessageCount = 16;
-        final 
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
-            okReceiveMessageResponsesFuture(FAKE_TOPIC_0, 
receivedMessageCount);
-        when(clientManager.receiveMessage(any(Endpoints.class), 
any(Metadata.class), any(ReceiveMessageRequest.class),
-            any(Duration.class))).thenReturn(future);
-        final List<MessageView> messageViews = simpleConsumer.receive(1, 
Duration.ofSeconds(1));
-        verify(clientManager, times(1)).receiveMessage(any(Endpoints.class),
-            any(Metadata.class), any(ReceiveMessageRequest.class), 
any(Duration.class));
-        assertEquals(receivedMessageCount, messageViews.size());
-        shutdown(simpleConsumer);
-    }
-
-    @Test
-    public void testAckMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        try {
-            final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<InvocationContext<AckMessageResponse>> 
future = okAckMessageResponseFuture();
-            when(clientManager.ackMessage(any(Endpoints.class), 
any(Metadata.class), any(AckMessageRequest.class),
-                any(Duration.class))).thenReturn(future);
-            simpleConsumer.ack(messageView);
-        } finally {
-            shutdown(simpleConsumer);
-        }
-    }
-
-    @Test
-    public void testChangeInvisibleDurationSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, 
FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        try {
-            final MessageViewImpl messageView = fakeMessageViewImpl();
-            final 
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
-                okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
-            when(clientManager.changeInvisibleDuration(any(Endpoints.class), 
any(Metadata.class),
-                any(ChangeInvisibleDurationRequest.class), 
any(Duration.class))).thenReturn(future);
-            simpleConsumer.changeInvisibleDuration0(messageView, 
Duration.ofSeconds(3));
-            assertEquals(FAKE_RECEIPT_HANDLE_1, 
messageView.getReceiptHandle());
-        } finally {
-            shutdown(simpleConsumer);
-        }
-    }
 }
\ No newline at end of file
diff --git 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index f069b52..babdfb4 100644
--- 
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ 
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -57,7 +57,6 @@ import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
@@ -75,9 +74,6 @@ public class ProducerImplTest extends TestBase {
     private ClientManagerImpl clientManager;
     @Mock
     private StreamObserver<TelemetryCommand> telemetryRequestObserver;
-    @SuppressWarnings("unused")
-    @InjectMocks
-    private ClientManagerRegistry clientManagerRegistry = 
ClientManagerRegistry.getInstance();
 
     private final ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
         .setEndpoints(FAKE_ACCESS_POINT).build();
diff --git a/java/pom.xml b/java/pom.xml
index a412dba..6c39ad4 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -297,7 +297,7 @@
                                 <limit>
                                     <counter>LINE</counter>
                                     <value>COVEREDRATIO</value>
-                                    <minimum>0.50</minimum>
+                                    <minimum>0.40</minimum>
                                 </limit>
                             </limits>
                         </rule>

Reply via email to