This is an automated email from the ASF dual-hosted git repository. aaronai pushed a commit to branch java_dev in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit ac1f5dc1843b0be6a3fb9b36bbf3151ec50f2bdd Author: Aaron Ai <yangkun....@gmail.com> AuthorDate: Sat Jul 16 11:32:24 2022 +0800 WIP --- .../rocketmq/client/java/impl/ClientImpl.java | 6 +- .../rocketmq/client/java/impl/ClientManager.java | 71 ++++----- .../client/java/impl/ClientManagerImpl.java | 63 +------- .../client/java/impl/ClientManagerRegistry.java | 95 ------------ .../rocketmq/client/java/rpc/RpcClientImpl.java | 1 - .../client/java/impl/ClientManagerImplTest.java | 2 +- .../java/impl/consumer/PushConsumerImplTest.java | 101 ------------- .../java/impl/consumer/SimpleConsumerImplTest.java | 163 --------------------- .../java/impl/producer/ProducerImplTest.java | 4 - java/pom.xml | 2 +- 10 files changed, 37 insertions(+), 471 deletions(-) diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java index 044b6aa..38d5001 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java @@ -145,6 +145,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, this.messageInterceptors = new ArrayList<>(); this.messageInterceptorsLock = new ReentrantReadWriteLock(); + this.clientManager = new ClientManagerImpl(this); + this.clientCallbackExecutor = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), @@ -174,8 +176,6 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, @Override protected void startUp() throws Exception { LOGGER.info("Begin to start the rocketmq client, clientId={}", clientId); - // Register client after client id generation. - this.clientManager = ClientManagerRegistry.getInstance().registerClient(this); // Fetch topic route from remote. LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}", clientId, topics); @@ -228,7 +228,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client, LOGGER.info("Begin to release telemetry sessions, clientId={}", clientId); releaseTelemetrySessions(); LOGGER.info("Release telemetry sessions successfully, clientId={}", clientId); - ClientManagerRegistry.getInstance().unregisterClient(this); + clientManager.stopAsync().awaitTerminated(); clientCallbackExecutor.shutdown(); if (!ExecutorServices.awaitTerminated(clientCallbackExecutor)) { LOGGER.error("[Bug] Timeout to shutdown the client callback executor, clientId={}", clientId); diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java index b61e315..a0d6139 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java @@ -38,6 +38,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.TelemetryCommand; +import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.ListenableFuture; import io.grpc.Metadata; import io.grpc.stub.StreamObserver; @@ -55,34 +56,13 @@ import org.apache.rocketmq.client.java.rpc.InvocationContext; * once {@link Client} is shut down, it must be unregistered by the client manager. The client manager holds the * connections and underlying threads, which are shared by all registered clients. */ -public interface ClientManager { - /** - * Register client. - * - * @param client client. - */ - void registerClient(Client client); - - /** - * Unregister client. - * - * @param client client. - */ - void unregisterClient(Client client); - - /** - * Returns {@code true} if manager contains no {@link Client}. - * - * @return {@code true} if this map contains no {@link Client}. - */ - boolean isEmpty(); - +public abstract class ClientManager extends AbstractIdleService { /** * Provide for the client to share the scheduler. * * @return shared scheduler. */ - ScheduledExecutorService getScheduler(); + public abstract ScheduledExecutorService getScheduler(); /** * Query topic route asynchronously, the method ensures no throwable. @@ -93,9 +73,8 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata, - QueryRouteRequest request, - Duration duration); + public abstract ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, + Metadata metadata, QueryRouteRequest request, Duration duration); /** * Heart beat asynchronously, the method ensures no throwable. @@ -106,9 +85,8 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata, - HeartbeatRequest request, - Duration duration); + public abstract ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, + Metadata metadata, HeartbeatRequest request, Duration duration); /** * Send message asynchronously, the method ensures no throwable. @@ -119,8 +97,8 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata, - SendMessageRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, + Metadata metadata, SendMessageRequest request, Duration duration); /** * Query assignment asynchronously, the method ensures no throwable. @@ -131,8 +109,8 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints, Metadata metadata, - QueryAssignmentRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints, + Metadata metadata, QueryAssignmentRequest request, Duration duration); /** * Receiving messages asynchronously from the server, the method ensures no throwable. @@ -141,8 +119,8 @@ public interface ClientManager { * @param metadata gRPC request header metadata. * @return invocation of response future. */ - ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints, - Metadata metadata, ReceiveMessageRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage( + Endpoints endpoints, Metadata metadata, ReceiveMessageRequest request, Duration duration); /** * Ack message asynchronously after the success of consumption, the method ensures no throwable. @@ -153,8 +131,8 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata, - AckMessageRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, + Metadata metadata, AckMessageRequest request, Duration duration); /** * Nack message asynchronously after the failure of consumption, the method ensures no throwable. @@ -165,8 +143,8 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Endpoints endpoints, - Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration( + Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration); /** * Send a message to the dead letter queue asynchronously, the method ensures no throwable. @@ -177,8 +155,9 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue( - Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> + forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata, + ForwardMessageToDeadLetterQueueRequest request, Duration duration); /** * Submit transaction resolution asynchronously, the method ensures no throwable. @@ -189,8 +168,8 @@ public interface ClientManager { * @param duration request max duration. * @return invocation of response future. */ - ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints, Metadata metadata, - EndTransactionRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints, + Metadata metadata, EndTransactionRequest request, Duration duration); /** * Asynchronously notify the server that client is terminated, the method ensures no throwable. @@ -202,8 +181,8 @@ public interface ClientManager { * @return response future of notification of client termination. */ @SuppressWarnings("UnusedReturnValue") - ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(Endpoints endpoints, - Metadata metadata, NotifyClientTerminationRequest request, Duration duration); + public abstract ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination( + Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest request, Duration duration); /** * Establish telemetry session stream to server. @@ -215,6 +194,6 @@ public interface ClientManager { * @return request observer. * @throws ClientException if failed to establish telemetry session stream. */ - StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata metadata, + public abstract StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata metadata, Duration duration, StreamObserver<TelemetryCommand> responseObserver) throws ClientException; } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java index b564634..fcfc05c 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java @@ -38,7 +38,6 @@ import apache.rocketmq.v2.ReceiveMessageResponse; import apache.rocketmq.v2.SendMessageRequest; import apache.rocketmq.v2.SendMessageResponse; import apache.rocketmq.v2.TelemetryCommand; -import com.google.common.util.concurrent.AbstractIdleService; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.concurrent.GuardedBy; @@ -49,8 +48,6 @@ import java.time.Duration; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; @@ -62,7 +59,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.net.ssl.SSLException; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.java.misc.ExecutorServices; -import org.apache.rocketmq.client.java.misc.MetadataUtils; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.rpc.InvocationContext; @@ -74,7 +70,7 @@ import org.slf4j.LoggerFactory; /** * @see ClientManager */ -public class ClientManagerImpl extends AbstractIdleService implements ClientManager { +public class ClientManagerImpl extends ClientManager { public static final Duration RPC_CLIENT_MAX_IDLE_DURATION = Duration.ofMinutes(30); public static final Duration RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY = Duration.ofSeconds(5); @@ -91,15 +87,11 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana private static final Logger LOGGER = LoggerFactory.getLogger(ClientManagerImpl.class); + private final Client client; @GuardedBy("rpcClientTableLock") private final Map<Endpoints, RpcClient> rpcClientTable; private final ReadWriteLock rpcClientTableLock; - /** - * Contains all client, key is {@link ClientImpl#clientId}. - */ - private final ConcurrentMap<String, Client> clientTable; - /** * In charge of all scheduled tasks. */ @@ -110,12 +102,10 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana */ private final ExecutorService asyncWorker; - public ClientManagerImpl() { + public ClientManagerImpl(Client client) { + this.client = client; this.rpcClientTable = new HashMap<>(); this.rpcClientTableLock = new ReentrantReadWriteLock(); - - this.clientTable = new ConcurrentHashMap<>(); - this.scheduler = new ScheduledThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), new ThreadFactoryImpl("ClientScheduler")); @@ -129,21 +119,6 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana new ThreadFactoryImpl("ClientAsyncWorker")); } - @Override - public void registerClient(Client client) { - clientTable.put(client.clientId(), client); - } - - @Override - public void unregisterClient(Client client) { - clientTable.remove(client.clientId()); - } - - @Override - public boolean isEmpty() { - return clientTable.isEmpty(); - } - /** * It is well-founded that a {@link RpcClient} is deprecated if it is idle for a long time, so it is essential to * clear it. @@ -172,30 +147,6 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana } } - private void doHeartbeat() { - for (Client client : clientTable.values()) { - client.doHeartbeat(); - } - } - - private void doStats() { - LOGGER.info("Start to log stats for a new round, clientVersion={}, clientWrapperVersion={}", - MetadataUtils.getVersion(), MetadataUtils.getWrapperVersion()); - for (Client client : clientTable.values()) { - client.doStats(); - } - } - - private void syncSettings() { - clientTable.values().forEach(client -> { - try { - client.syncSettings(); - } catch (Throwable t) { - LOGGER.error("Failed to announce settings, clientId={}", client.clientId(), t); - } - }); - } - /** * Return the RPC client by remote {@link Endpoints}, would create the client automatically if it does not exist. * @@ -376,7 +327,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana scheduler.scheduleWithFixedDelay( () -> { try { - doHeartbeat(); + client.doHeartbeat(); } catch (Throwable t) { LOGGER.error("Exception raised while heartbeat.", t); } @@ -389,7 +340,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana scheduler.scheduleWithFixedDelay( () -> { try { - doStats(); + client.doStats(); } catch (Throwable t) { LOGGER.error("Exception raised while log stats.", t); } @@ -402,7 +353,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana scheduler.scheduleWithFixedDelay( () -> { try { - syncSettings(); + client.syncSettings(); } catch (Throwable t) { LOGGER.error("Exception raised during the setting announcement.", t); } diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java deleted file mode 100644 index 80299da..0000000 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.client.java.impl; - -import com.google.errorprone.annotations.concurrent.GuardedBy; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import javax.annotation.concurrent.ThreadSafe; - -@ThreadSafe -public class ClientManagerRegistry { - private static final ClientManagerRegistry INSTANCE = new ClientManagerRegistry(); - - @GuardedBy("clientIdsLock") - private final Set<String> clientIds = new HashSet<>(); - private final Lock clientIdsLock = new ReentrantLock(); - - private volatile ClientManagerImpl singletonClientManager = null; - - private ClientManagerRegistry() { - } - - public static ClientManagerRegistry getInstance() { - return INSTANCE; - } - - /** - * Register {@link Client} to the appointed manager by manager id, start the manager if it is created newly. - * - * <p>Different clients would share the same {@link ClientManager} if they have the same manager id. - * - * @param client the client to register. - * @return the client manager which is started. - */ - public ClientManager registerClient(Client client) { - clientIdsLock.lock(); - try { - if (null == singletonClientManager) { - final ClientManagerImpl clientManager = new ClientManagerImpl(); - clientManager.startAsync().awaitRunning(); - singletonClientManager = clientManager; - } - clientIds.add(client.clientId()); - singletonClientManager.registerClient(client); - return singletonClientManager; - } finally { - clientIdsLock.unlock(); - } - } - - /** - * Unregister {@link Client} to the appointed manager by message-id, shutdown the manager if no client - * registered in it. - * - * @param client client to unregister. - * @return {@link ClientManager} is removed or not. - */ - @SuppressWarnings("UnusedReturnValue") - public boolean unregisterClient(Client client) { - ClientManagerImpl clientManager = null; - clientIdsLock.lock(); - try { - clientIds.remove(client.clientId()); - singletonClientManager.unregisterClient(client); - if (clientIds.isEmpty()) { - clientManager = singletonClientManager; - singletonClientManager = null; - } - } finally { - clientIdsLock.unlock(); - } - // No need to hold the lock here. - if (null != clientManager) { - clientManager.stopAsync().awaitTerminated(); - } - return null != clientManager; - } -} diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java index ffba65e..13c0fe5 100644 --- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java +++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java @@ -91,7 +91,6 @@ public class RpcClientImpl implements RpcClient { .intercept(LoggingInterceptor.getInstance()) .sslContext(sslContext); // Disable grpc's auto-retry here. - channelBuilder.disableRetry(); final List<InetSocketAddress> socketAddresses = endpoints.toSocketAddresses(); if (null != socketAddresses) { diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java index f0b7f3e..270f09b 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java @@ -33,7 +33,7 @@ import org.apache.rocketmq.client.java.tool.TestBase; import org.junit.Test; public class ClientManagerImplTest extends TestBase { - private final ClientManagerImpl clientManager = new ClientManagerImpl(); + private final ClientManagerImpl clientManager = new ClientManagerImpl(null); @Test public void testQueryRoute() { diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java index 8b88702..6ad733b 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java @@ -17,58 +17,19 @@ package org.apache.rocketmq.client.java.impl.consumer; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import apache.rocketmq.v2.CustomizedBackoff; -import apache.rocketmq.v2.QueryAssignmentRequest; -import apache.rocketmq.v2.QueryRouteRequest; -import apache.rocketmq.v2.RetryPolicy; -import apache.rocketmq.v2.Settings; -import apache.rocketmq.v2.Subscription; -import apache.rocketmq.v2.TelemetryCommand; -import com.google.common.util.concurrent.Service; -import com.google.protobuf.util.Durations; -import io.grpc.Metadata; -import io.grpc.stub.StreamObserver; -import java.time.Duration; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.MessageListener; -import org.apache.rocketmq.client.java.impl.ClientManagerImpl; -import org.apache.rocketmq.client.java.impl.ClientManagerRegistry; -import org.apache.rocketmq.client.java.impl.ClientSessionImpl; -import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; -import org.apache.rocketmq.client.java.route.Endpoints; import org.apache.rocketmq.client.java.tool.TestBase; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class PushConsumerImplTest extends TestBase { - @Mock - private ClientManagerImpl clientManager; - - @Mock - private StreamObserver<TelemetryCommand> telemetryRequestObserver; - - @SuppressWarnings("unused") - @InjectMocks - private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance(); - private final Map<String, FilterExpression> subscriptionExpressions = createSubscriptionExpressions(FAKE_TOPIC_0); private final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS; @@ -82,57 +43,6 @@ public class PushConsumerImplTest extends TestBase { private PushConsumerImpl pushConsumer; - private void start(PushConsumerImpl pushConsumer) throws ClientException { - when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), - any(Duration.class))) - .thenReturn(okQueryRouteResponseFuture()); - when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), - any(ClientSessionImpl.class))) - .thenReturn(telemetryRequestObserver); - final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl( - "TestScheduler")); - when(clientManager.getScheduler()).thenReturn(scheduler); - doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class)); - - CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder() - .addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build(); - RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff) - .build(); - Subscription subscription = Subscription.newBuilder().build(); - Settings settings = Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build(); - final Service service = pushConsumer.startAsync(); - pushConsumer.getPushConsumerSettings().applySettingsCommand(settings); - service.awaitRunning(); - } - - private void shutdown(PushConsumerImpl pushConsumer) { - final Service clientManagerService = mock(Service.class); - when(clientManager.stopAsync()).thenReturn(clientManagerService); - doNothing().when(clientManagerService).awaitTerminated(); - pushConsumer.stopAsync().awaitTerminated(); - } - - @Test - public void testScanAssignment() throws ExecutionException, InterruptedException, ClientException { - pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, - maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount); - start(pushConsumer); - when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), - any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture()); - pushConsumer.scanAssignments(); - verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class), - any(QueryRouteRequest.class), any(Duration.class)); - verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class), - any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class)); - Assert.assertEquals(okQueryAssignmentResponseFuture().get().getResp().getAssignmentsCount(), - pushConsumer.getQueueSize()); - when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class), - any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture()); - pushConsumer.scanAssignments(); - Assert.assertEquals(0, pushConsumer.getQueueSize()); - shutdown(pushConsumer); - } - @Test(expected = IllegalStateException.class) public void testSubscribeWithoutStart() throws ClientException { pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener, @@ -146,15 +56,4 @@ public class PushConsumerImplTest extends TestBase { maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount); pushConsumer.unsubscribe(FAKE_TOPIC_0); } - - @Test - public void testSubscribeWithSubscriptionOverwriting() throws ClientException { - pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, - messageListener, - maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount); - start(pushConsumer); - final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0); - pushConsumer.subscribe(FAKE_TOPIC_0, filterExpression); - shutdown(pushConsumer); - } } \ No newline at end of file diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java index e6ede4e..6659536 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java @@ -17,68 +17,20 @@ package org.apache.rocketmq.client.java.impl.consumer; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import apache.rocketmq.v2.AckMessageRequest; -import apache.rocketmq.v2.AckMessageResponse; -import apache.rocketmq.v2.Broker; -import apache.rocketmq.v2.ChangeInvisibleDurationRequest; -import apache.rocketmq.v2.ChangeInvisibleDurationResponse; -import apache.rocketmq.v2.Code; -import apache.rocketmq.v2.MessageQueue; -import apache.rocketmq.v2.Permission; -import apache.rocketmq.v2.QueryRouteRequest; -import apache.rocketmq.v2.QueryRouteResponse; -import apache.rocketmq.v2.ReceiveMessageRequest; -import apache.rocketmq.v2.ReceiveMessageResponse; -import apache.rocketmq.v2.Resource; -import apache.rocketmq.v2.Settings; -import apache.rocketmq.v2.Status; -import apache.rocketmq.v2.Subscription; -import apache.rocketmq.v2.TelemetryCommand; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.SettableFuture; -import io.grpc.Metadata; -import io.grpc.stub.StreamObserver; import java.time.Duration; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.consumer.FilterExpression; -import org.apache.rocketmq.client.apis.message.MessageView; -import org.apache.rocketmq.client.java.impl.ClientManagerImpl; -import org.apache.rocketmq.client.java.impl.ClientManagerRegistry; -import org.apache.rocketmq.client.java.impl.ClientSessionImpl; -import org.apache.rocketmq.client.java.message.MessageViewImpl; -import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; -import org.apache.rocketmq.client.java.route.Endpoints; -import org.apache.rocketmq.client.java.rpc.InvocationContext; import org.apache.rocketmq.client.java.tool.TestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.InjectMocks; -import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class SimpleConsumerImplTest extends TestBase { - @Mock - private ClientManagerImpl clientManager; - @Mock - private StreamObserver<TelemetryCommand> telemetryRequestObserver; @InjectMocks - private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance(); private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(FAKE_ACCESS_POINT).build(); @@ -87,45 +39,6 @@ public class SimpleConsumerImplTest extends TestBase { private SimpleConsumerImpl simpleConsumer; - private void start(SimpleConsumerImpl simpleConsumer) throws ClientException { - SettableFuture<InvocationContext<QueryRouteResponse>> future0 = SettableFuture.create(); - Status status = Status.newBuilder().setCode(Code.OK).build(); - List<MessageQueue> messageQueueList = new ArrayList<>(); - MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0)) - .setPermission(Permission.READ_WRITE) - .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0) - .build(); - messageQueueList.add(mq); - QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status) - .addAllMessageQueues(messageQueueList).build(); - final InvocationContext<QueryRouteResponse> invocationContext = new InvocationContext<>(response, - fakeRpcContext()); - future0.set(invocationContext); - when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class), - any(Duration.class))) - .thenReturn(future0); - when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class), - any(ClientSessionImpl.class))) - .thenReturn(telemetryRequestObserver); - final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryImpl("TestScheduler")); - when(clientManager.getScheduler()).thenReturn(scheduler); - doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class)); - - Subscription subscription = Subscription.newBuilder().build(); - Settings settings = Settings.newBuilder().setSubscription(subscription).build(); - final Service service = simpleConsumer.startAsync(); - simpleConsumer.getSimpleConsumerSettings().applySettingsCommand(settings); - service.awaitRunning(); - } - - private void shutdown(SimpleConsumerImpl simpleConsumer) { - final Service clientManagerService = mock(Service.class); - when(clientManager.stopAsync()).thenReturn(clientManagerService); - doNothing().when(clientManagerService).awaitTerminated(); - simpleConsumer.stopAsync().awaitTerminated(); - } - @Test(expected = IllegalStateException.class) public void testReceiveWithoutStart() throws ClientException { simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); @@ -149,80 +62,4 @@ public class SimpleConsumerImplTest extends TestBase { simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); simpleConsumer.unsubscribe(FAKE_TOPIC_0); } - - @Test - public void testStartAndShutdown() throws ClientException { - simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); - start(simpleConsumer); - shutdown(simpleConsumer); - } - - @Test - public void testSubscribeWithSubscriptionOverwriting() throws ClientException { - simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); - start(simpleConsumer); - final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0); - simpleConsumer.subscribe(FAKE_TOPIC_0, filterExpression); - shutdown(simpleConsumer); - } - - @Test(expected = IllegalArgumentException.class) - public void testReceiveWithAllTopicsAreUnsubscribed() throws ClientException { - simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); - start(simpleConsumer); - simpleConsumer.unsubscribe(FAKE_TOPIC_0); - try { - simpleConsumer.receive(1, Duration.ofSeconds(1)); - } finally { - shutdown(simpleConsumer); - } - } - - @Test - public void testReceiveMessageSuccess() throws ClientException { - simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); - start(simpleConsumer); - int receivedMessageCount = 16; - final ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future = - okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount); - when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class), - any(Duration.class))).thenReturn(future); - final List<MessageView> messageViews = simpleConsumer.receive(1, Duration.ofSeconds(1)); - verify(clientManager, times(1)).receiveMessage(any(Endpoints.class), - any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class)); - assertEquals(receivedMessageCount, messageViews.size()); - shutdown(simpleConsumer); - } - - @Test - public void testAckMessageSuccess() throws ClientException { - simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); - start(simpleConsumer); - try { - final MessageViewImpl messageView = fakeMessageViewImpl(); - final ListenableFuture<InvocationContext<AckMessageResponse>> future = okAckMessageResponseFuture(); - when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class), - any(Duration.class))).thenReturn(future); - simpleConsumer.ack(messageView); - } finally { - shutdown(simpleConsumer); - } - } - - @Test - public void testChangeInvisibleDurationSuccess() throws ClientException { - simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions); - start(simpleConsumer); - try { - final MessageViewImpl messageView = fakeMessageViewImpl(); - final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future = - okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1); - when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class), - any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future); - simpleConsumer.changeInvisibleDuration0(messageView, Duration.ofSeconds(3)); - assertEquals(FAKE_RECEIPT_HANDLE_1, messageView.getReceiptHandle()); - } finally { - shutdown(simpleConsumer); - } - } } \ No newline at end of file diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java index f069b52..babdfb4 100644 --- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java +++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java @@ -57,7 +57,6 @@ import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.java.impl.ClientManagerImpl; -import org.apache.rocketmq.client.java.impl.ClientManagerRegistry; import org.apache.rocketmq.client.java.impl.ClientSessionImpl; import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl; import org.apache.rocketmq.client.java.route.Endpoints; @@ -75,9 +74,6 @@ public class ProducerImplTest extends TestBase { private ClientManagerImpl clientManager; @Mock private StreamObserver<TelemetryCommand> telemetryRequestObserver; - @SuppressWarnings("unused") - @InjectMocks - private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance(); private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(FAKE_ACCESS_POINT).build(); diff --git a/java/pom.xml b/java/pom.xml index a412dba..6c39ad4 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -297,7 +297,7 @@ <limit> <counter>LINE</counter> <value>COVEREDRATIO</value> - <minimum>0.50</minimum> + <minimum>0.40</minimum> </limit> </limits> </rule>