This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 9953cd7 Java: Add invocation context (#32)
9953cd7 is described below
commit 9953cd76fb70a32049c50e57b8050a80f4585ade
Author: Aaron Ai <[email protected]>
AuthorDate: Thu Jul 7 11:00:00 2022 +0800
Java: Add invocation context (#32)
---
.../rocketmq/client/java/impl/ClientImpl.java | 20 ++---
.../rocketmq/client/java/impl/ClientManager.java | 48 +++++-----
.../client/java/impl/ClientManagerImpl.java | 71 ++++++---------
.../client/java/impl/consumer/ConsumerImpl.java | 41 +++++----
.../java/impl/consumer/ProcessQueueImpl.java | 33 ++++---
.../java/impl/consumer/PushConsumerImpl.java | 27 +++---
.../java/impl/consumer/ReceiveMessageResult.java | 8 +-
.../java/impl/consumer/SimpleConsumerImpl.java | 19 ++--
.../client/java/impl/producer/ProducerImpl.java | 21 +++--
.../client/java/rpc/InvocationContext.java | 46 ++++++++++
.../apache/rocketmq/client/java/rpc/RpcClient.java | 51 ++++++-----
.../rocketmq/client/java/rpc/RpcClientImpl.java | 100 ++++++++++++++-------
.../rocketmq/client/java/rpc/RpcContext.java | 39 ++++++++
.../apache/rocketmq/client/java/rpc/Signature.java | 3 +-
.../apache/rocketmq/client/java/rpc/TLSHelper.java | 8 +-
.../java/impl/consumer/ProcessQueueImplTest.java | 15 ++--
.../java/impl/consumer/PushConsumerImplTest.java | 6 +-
.../java/impl/consumer/SimpleConsumerImplTest.java | 13 +--
.../java/impl/producer/ProducerImplTest.java | 21 +++--
.../apache/rocketmq/client/java/tool/TestBase.java | 97 +++++++++-----------
20 files changed, 409 insertions(+), 278 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index a4b144d..e47bdcd 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -42,7 +42,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
-import java.io.UnsupportedEncodingException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
@@ -81,6 +80,7 @@ import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.apache.rocketmq.client.java.rpc.Signature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -555,7 +555,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
/**
* Real-time signature generation
*/
- protected Metadata sign() throws UnsupportedEncodingException,
NoSuchAlgorithmException, InvalidKeyException {
+ protected Metadata sign() throws NoSuchAlgorithmException,
InvalidKeyException {
return Signature.sign(clientConfiguration, clientId);
}
@@ -568,11 +568,12 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
private void doHeartbeat(HeartbeatRequest request, final Endpoints
endpoints) {
try {
Metadata metadata = sign();
- final ListenableFuture<HeartbeatResponse> future = clientManager
+ final ListenableFuture<InvocationContext<HeartbeatResponse>>
future = clientManager
.heartbeat(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
- Futures.addCallback(future, new
FutureCallback<HeartbeatResponse>() {
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<HeartbeatResponse>>() {
@Override
- public void onSuccess(HeartbeatResponse response) {
+ public void onSuccess(InvocationContext<HeartbeatResponse>
context) {
+ final HeartbeatResponse response = context.getResp();
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.OK != code) {
@@ -612,15 +613,15 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
}
private ListenableFuture<TopicRouteDataResult> fetchTopicRoute(final
String topic) {
- final SettableFuture<TopicRouteDataResult> future =
SettableFuture.create();
try {
Resource topicResource =
Resource.newBuilder().setName(topic).build();
final QueryRouteRequest request =
QueryRouteRequest.newBuilder().setTopic(topicResource)
.setEndpoints(accessEndpoints.toProtobuf()).build();
final Metadata metadata = sign();
- final ListenableFuture<QueryRouteResponse> responseFuture =
+ final ListenableFuture<InvocationContext<QueryRouteResponse>>
contextFuture =
clientManager.queryRoute(accessEndpoints, metadata, request,
clientConfiguration.getRequestTimeout());
- return Futures.transform(responseFuture, response -> {
+ return Futures.transform(contextFuture, context -> {
+ final QueryRouteResponse response = context.getResp();
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.OK != code) {
@@ -631,8 +632,7 @@ public abstract class ClientImpl extends
AbstractIdleService implements Client,
return new TopicRouteDataResult(new
TopicRouteData(response.getMessageQueuesList()), status);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 9f40a6d..9901ac9 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
/**
* Client manager supplies a series of unified APIs to execute remote
procedure calls for each {@link Client}.
@@ -90,9 +91,10 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request query route request.
* @param duration request max duration.
- * @return response future of the topic route.
+ * @return invocation of response future.
*/
- ListenableFuture<QueryRouteResponse> queryRoute(Endpoints endpoints,
Metadata metadata, QueryRouteRequest request,
+ ListenableFuture<InvocationContext<QueryRouteResponse>>
queryRoute(Endpoints endpoints, Metadata metadata,
+ QueryRouteRequest request,
Duration duration);
/**
@@ -102,9 +104,10 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request heartbeat request.
* @param duration request max duration.
- * @return response future of heartbeat.
+ * @return invocation of response future.
*/
- ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints,
Metadata metadata, HeartbeatRequest request,
+ ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints
endpoints, Metadata metadata,
+ HeartbeatRequest request,
Duration duration);
/**
@@ -114,9 +117,9 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request send message request.
* @param duration request max duration.
- * @return response future of the sending message.
+ * @return invocation of response future.
*/
- ListenableFuture<SendMessageResponse> sendMessage(Endpoints endpoints,
Metadata metadata,
+ ListenableFuture<InvocationContext<SendMessageResponse>>
sendMessage(Endpoints endpoints, Metadata metadata,
SendMessageRequest request, Duration duration);
/**
@@ -126,9 +129,9 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request query assignment request.
* @param duration request max duration.
- * @return response future of query assignment.
+ * @return invocation of response future.
*/
- ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints
endpoints, Metadata metadata,
+ ListenableFuture<InvocationContext<QueryAssignmentResponse>>
queryAssignment(Endpoints endpoints, Metadata metadata,
QueryAssignmentRequest request, Duration duration);
/**
@@ -136,9 +139,10 @@ public interface ClientManager {
*
* @param endpoints requested endpoints.
* @param metadata gRPC request header metadata.
+ * @return invocation of response future.
*/
- ListenableFuture<Iterator<ReceiveMessageResponse>>
receiveMessage(Endpoints endpoints, Metadata metadata,
- ReceiveMessageRequest request, Duration duration);
+ ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>>
receiveMessage(Endpoints endpoints,
+ Metadata metadata, ReceiveMessageRequest request, Duration duration);
/**
* Ack message asynchronously after the success of consumption, the method
ensures no throwable.
@@ -147,10 +151,10 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request ack message request.
* @param duration request max duration.
- * @return response future of ack message.
+ * @return invocation of response future.
*/
- ListenableFuture<AckMessageResponse> ackMessage(Endpoints endpoints,
Metadata metadata, AckMessageRequest request,
- Duration duration);
+ ListenableFuture<InvocationContext<AckMessageResponse>>
ackMessage(Endpoints endpoints, Metadata metadata,
+ AckMessageRequest request, Duration duration);
/**
* Nack message asynchronously after the failure of consumption, the
method ensures no throwable.
@@ -159,10 +163,10 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request nack message request.
* @param duration request max duration.
- * @return response future of nack message.
+ * @return invocation of response future.
*/
- ListenableFuture<ChangeInvisibleDurationResponse>
changeInvisibleDuration(Endpoints endpoints, Metadata metadata,
- ChangeInvisibleDurationRequest request, Duration duration);
+ ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
changeInvisibleDuration(Endpoints endpoints,
+ Metadata metadata, ChangeInvisibleDurationRequest request, Duration
duration);
/**
* Send a message to the dead letter queue asynchronously, the method
ensures no throwable.
@@ -171,9 +175,9 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request request of sending a message to DLQ.
* @param duration request max duration.
- * @return response future of sending a message to DLQ.
+ * @return invocation of response future.
*/
- ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
forwardMessageToDeadLetterQueue(
+
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
forwardMessageToDeadLetterQueue(
Endpoints endpoints, Metadata metadata,
ForwardMessageToDeadLetterQueueRequest request, Duration duration);
/**
@@ -183,9 +187,9 @@ public interface ClientManager {
* @param metadata gRPC request header metadata.
* @param request end transaction request.
* @param duration request max duration.
- * @return response future of submitting transaction resolution.
+ * @return invocation of response future.
*/
- ListenableFuture<EndTransactionResponse> endTransaction(Endpoints
endpoints, Metadata metadata,
+ ListenableFuture<InvocationContext<EndTransactionResponse>>
endTransaction(Endpoints endpoints, Metadata metadata,
EndTransactionRequest request, Duration duration);
/**
@@ -198,8 +202,8 @@ public interface ClientManager {
* @return response future of notification of client termination.
*/
@SuppressWarnings("UnusedReturnValue")
- ListenableFuture<NotifyClientTerminationResponse>
notifyClientTermination(Endpoints endpoints, Metadata metadata,
- NotifyClientTerminationRequest request, Duration duration);
+ ListenableFuture<InvocationContext<NotifyClientTerminationResponse>>
notifyClientTermination(Endpoints endpoints,
+ Metadata metadata, NotifyClientTerminationRequest request, Duration
duration);
StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata
metadata,
Duration duration, StreamObserver<TelemetryCommand> responseObserver)
throws ClientException;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 4d33ab0..337edcc 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -39,8 +39,8 @@ import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
@@ -65,6 +65,7 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.MetadataUtils;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.apache.rocketmq.client.java.rpc.RpcClient;
import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
import org.slf4j.Logger;
@@ -235,132 +236,112 @@ public class ClientManagerImpl extends
AbstractIdleService implements ClientMana
}
@Override
- public ListenableFuture<QueryRouteResponse> queryRoute(Endpoints
endpoints, Metadata metadata,
+ public ListenableFuture<InvocationContext<QueryRouteResponse>>
queryRoute(Endpoints endpoints, Metadata metadata,
QueryRouteRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.queryRoute(metadata, request, asyncWorker,
duration);
} catch (Throwable t) {
- final SettableFuture<QueryRouteResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints,
Metadata metadata,
+ public ListenableFuture<InvocationContext<HeartbeatResponse>>
heartbeat(Endpoints endpoints, Metadata metadata,
HeartbeatRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.heartbeat(metadata, request, asyncWorker,
duration);
} catch (Throwable t) {
- final SettableFuture<HeartbeatResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<SendMessageResponse> sendMessage(Endpoints
endpoints, Metadata metadata,
+ public ListenableFuture<InvocationContext<SendMessageResponse>>
sendMessage(Endpoints endpoints, Metadata metadata,
SendMessageRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.sendMessage(metadata, request, asyncWorker,
duration);
} catch (Throwable t) {
- final SettableFuture<SendMessageResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints
endpoints, Metadata metadata,
- QueryAssignmentRequest request, Duration duration) {
+ public ListenableFuture<InvocationContext<QueryAssignmentResponse>>
queryAssignment(Endpoints endpoints,
+ Metadata metadata, QueryAssignmentRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.queryAssignment(metadata, request, asyncWorker,
duration);
} catch (Throwable t) {
- final SettableFuture<QueryAssignmentResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<Iterator<ReceiveMessageResponse>>
receiveMessage(Endpoints endpoints, Metadata metadata,
- ReceiveMessageRequest request, Duration duration) {
+ public
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>>
receiveMessage(Endpoints endpoints,
+ Metadata metadata, ReceiveMessageRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.receiveMessage(metadata, request, asyncWorker,
duration);
} catch (Throwable t) {
- SettableFuture<Iterator<ReceiveMessageResponse>> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<AckMessageResponse> ackMessage(Endpoints
endpoints, Metadata metadata,
+ public ListenableFuture<InvocationContext<AckMessageResponse>>
ackMessage(Endpoints endpoints, Metadata metadata,
AckMessageRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.ackMessage(metadata, request, asyncWorker,
duration);
} catch (Throwable t) {
- final SettableFuture<AckMessageResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<ChangeInvisibleDurationResponse>
changeInvisibleDuration(Endpoints endpoints,
- Metadata metadata, ChangeInvisibleDurationRequest request, Duration
duration) {
+ public
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
changeInvisibleDuration(
+ Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest
request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.changeInvisibleDuration(metadata, request,
asyncWorker, duration);
} catch (Throwable t) {
- final SettableFuture<ChangeInvisibleDurationResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
forwardMessageToDeadLetterQueue(
+ public
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
forwardMessageToDeadLetterQueue(
Endpoints endpoints, Metadata metadata,
ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.forwardMessageToDeadLetterQueue(metadata,
request, asyncWorker, duration);
} catch (Throwable t) {
- final SettableFuture<ForwardMessageToDeadLetterQueueResponse>
future = SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<EndTransactionResponse> endTransaction(Endpoints
endpoints, Metadata metadata,
- EndTransactionRequest request, Duration duration) {
+ public ListenableFuture<InvocationContext<EndTransactionResponse>>
endTransaction(Endpoints endpoints,
+ Metadata metadata, EndTransactionRequest request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.endTransaction(metadata, request, asyncWorker,
duration);
} catch (Throwable t) {
- SettableFuture<EndTransactionResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
@Override
- public ListenableFuture<NotifyClientTerminationResponse>
notifyClientTermination(
+ public
ListenableFuture<InvocationContext<NotifyClientTerminationResponse>>
notifyClientTermination(
Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest
request, Duration duration) {
try {
final RpcClient rpcClient = getRpcClient(endpoints);
return rpcClient.notifyClientTermination(metadata, request,
asyncWorker, duration);
} catch (Throwable t) {
- final SettableFuture<NotifyClientTerminationResponse> future =
SettableFuture.create();
- future.setException(t);
- return future;
+ return Futures.immediateFailedFuture(t);
}
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 539b876..a8b04ec 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,16 +72,18 @@ abstract class ConsumerImpl extends ClientImpl {
}
@SuppressWarnings("SameParameterValue")
- protected ListenableFuture<ReceiveMessageResult>
receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq,
- Duration timeout) {
+ protected ListenableFuture<ReceiveMessageResult>
receiveMessage(ReceiveMessageRequest request,
+ MessageQueueImpl mq, Duration timeout) {
List<MessageViewImpl> messages = new ArrayList<>();
final SettableFuture<ReceiveMessageResult> future0 =
SettableFuture.create();
try {
Metadata metadata = sign();
final Endpoints endpoints = mq.getBroker().getEndpoints();
- final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
clientManager.receiveMessage(endpoints,
- metadata, request, timeout);
- return Futures.transform(future, it -> {
+ final
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
+ clientManager.receiveMessage(endpoints,
+ metadata, request, timeout);
+ return Futures.transform(future, context -> {
+ final Iterator<ReceiveMessageResponse> it = context.getResp();
// Null here means status not set yet.
Status status = null;
Timestamp deliveryTimestampFromRemote = null;
@@ -106,7 +109,7 @@ abstract class ConsumerImpl extends ClientImpl {
final MessageViewImpl view =
MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
messages.add(view);
}
- return new ReceiveMessageResult(endpoints, status, messages);
+ return new ReceiveMessageResult(endpoints,
context.getRpcContext().getRequestId(), status, messages);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
future0.setException(t);
@@ -134,9 +137,9 @@ abstract class ConsumerImpl extends ClientImpl {
}
- public ListenableFuture<AckMessageResponse> ackMessage(MessageViewImpl
messageView) {
+ public ListenableFuture<InvocationContext<AckMessageResponse>>
ackMessage(MessageViewImpl messageView) {
final Endpoints endpoints = messageView.getEndpoints();
- ListenableFuture<AckMessageResponse> future;
+ ListenableFuture<InvocationContext<AckMessageResponse>> future;
final Stopwatch stopwatch = Stopwatch.createStarted();
final List<MessageCommon> messageCommons =
Collections.singletonList(messageView.getMessageCommon());
@@ -146,11 +149,14 @@ abstract class ConsumerImpl extends ClientImpl {
final Metadata metadata = sign();
future = clientManager.ackMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
} catch (Throwable t) {
- return Futures.immediateFailedFuture(t);
+ final SettableFuture<InvocationContext<AckMessageResponse>>
future0 = SettableFuture.create();
+ future0.setException(t);
+ future = future0;
}
- Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<AckMessageResponse>>() {
@Override
- public void onSuccess(AckMessageResponse response) {
+ public void onSuccess(InvocationContext<AckMessageResponse>
context) {
+ final AckMessageResponse response = context.getResp();
final Status status = response.getStatus();
final Code code = status.getCode();
final Duration duration = stopwatch.elapsed();
@@ -168,10 +174,10 @@ abstract class ConsumerImpl extends ClientImpl {
return future;
}
- public ListenableFuture<ChangeInvisibleDurationResponse>
changeInvisibleDuration(MessageViewImpl messageView,
- Duration invisibleDuration) {
+ public
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
changeInvisibleDuration(
+ MessageViewImpl messageView, Duration invisibleDuration) {
final Endpoints endpoints = messageView.getEndpoints();
- ListenableFuture<ChangeInvisibleDurationResponse> future;
+ ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
future;
final Stopwatch stopwatch = Stopwatch.createStarted();
final List<MessageCommon> messageCommons =
Collections.singletonList(messageView.getMessageCommon());
@@ -182,14 +188,15 @@ abstract class ConsumerImpl extends ClientImpl {
future = clientManager.changeInvisibleDuration(endpoints,
metadata, request,
clientConfiguration.getRequestTimeout());
} catch (Throwable t) {
- final SettableFuture<ChangeInvisibleDurationResponse> future0 =
SettableFuture.create();
+ final
SettableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future0 =
SettableFuture.create();
future0.setException(t);
future = future0;
}
final MessageId messageId = messageView.getMessageId();
- Futures.addCallback(future, new
FutureCallback<ChangeInvisibleDurationResponse>() {
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<ChangeInvisibleDurationResponse>>() {
@Override
- public void onSuccess(ChangeInvisibleDurationResponse response) {
+ public void
onSuccess(InvocationContext<ChangeInvisibleDurationResponse> context) {
+ final ChangeInvisibleDurationResponse response =
context.getResp();
final Status status = response.getStatus();
final Code code = status.getCode();
final Duration duration = stopwatch.elapsed();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index f36012f..c914dce 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -51,6 +51,7 @@ import
org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -387,11 +388,12 @@ class ProcessQueueImpl implements ProcessQueue {
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
- final ListenableFuture<AckMessageResponse> future =
consumer.ackMessage(messageView);
- Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+ final ListenableFuture<InvocationContext<AckMessageResponse>> future =
consumer.ackMessage(messageView);
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<AckMessageResponse>>() {
@Override
- public void onSuccess(AckMessageResponse response) {
- final Status status = response.getStatus();
+ public void onSuccess(InvocationContext<AckMessageResponse>
context) {
+ final AckMessageResponse resp = context.getResp();
+ final Status status = resp.getStatus();
final Code code = status.getCode();
if (Code.OK.equals(code)) {
LOGGER.debug("Ack message successfully, clientId={},
consumerGroup={}, messageId={}, mq={}, "
@@ -478,16 +480,17 @@ class ProcessQueueImpl implements ProcessQueue {
private void forwardToDeadLetterQueue(final MessageViewImpl messageView,
final int attempt,
final SettableFuture<Void> future0) {
- final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future
=
+ final
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
future =
consumer.forwardMessageToDeadLetterQueue(messageView);
final String clientId = consumer.getClientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
- Futures.addCallback(future, new
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
@Override
- public void onSuccess(ForwardMessageToDeadLetterQueueResponse
response) {
- final Status status = response.getStatus();
+ public void
onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
+ final ForwardMessageToDeadLetterQueueResponse resp =
context.getResp();
+ final Status status = resp.getStatus();
final Code code = status.getCode();
// Log failure and retry later.
if (!Code.OK.equals(code)) {
@@ -559,17 +562,19 @@ class ProcessQueueImpl implements ProcessQueue {
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
- final ListenableFuture<AckMessageResponse> future =
consumer.ackMessage(messageView);
- Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+ final ListenableFuture<InvocationContext<AckMessageResponse>> future =
consumer.ackMessage(messageView);
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<AckMessageResponse>>() {
@Override
- public void onSuccess(AckMessageResponse response) {
- final Status status = response.getStatus();
+ public void onSuccess(InvocationContext<AckMessageResponse>
context) {
+ final AckMessageResponse resp = context.getResp();
+ final String requestId =
context.getRpcContext().getRequestId();
+ final Status status = resp.getStatus();
final Code code = status.getCode();
// Log failure and retry later.
if (!Code.OK.equals(code)) {
LOGGER.error("Failed to ack fifo message, would attempt to
re-ack later, clientId={}, "
- + "consumerGroup={}, attempt={}, messageId={},
mq={}, code={}, endpoints={}, status "
- + "message=[{}]", clientId, consumerGroup,
attempt, messageId, mq, code,
+ + "consumerGroup={}, attempt={}, messageId={},
mq={}, code={}, requestId={}, endpoints={}, "
+ + "status message=[{}]", clientId, consumerGroup,
attempt, messageId, mq, code, requestId,
endpoints, status.getMessage());
ackFifoMessageLater(messageView, 1 + attempt, future0);
return;
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 4a80377..41648bd 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -71,6 +71,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -264,15 +265,16 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
private ListenableFuture<Assignments> queryAssignment(final String topic) {
final ListenableFuture<Endpoints> future =
pickEndpointsToQueryAssignments(topic);
- final ListenableFuture<QueryAssignmentResponse> responseFuture =
+ final ListenableFuture<InvocationContext<QueryAssignmentResponse>>
responseFuture =
Futures.transformAsync(future, endpoints -> {
final Metadata metadata = sign();
final QueryAssignmentRequest request =
wrapQueryAssignmentRequest(topic);
return clientManager.queryAssignment(endpoints, metadata,
request,
clientConfiguration.getRequestTimeout());
}, MoreExecutors.directExecutor());
- return Futures.transformAsync(responseFuture, response -> {
- final Status status = response.getStatus();
+ return Futures.transformAsync(responseFuture, context -> {
+ final QueryAssignmentResponse resp = context.getResp();
+ final Status status = resp.getStatus();
final Code code = status.getCode();
if (!Code.OK.equals(code)) {
final String message = String.format("Failed to query
assignment, code=%d, status message=[{%s}]",
@@ -280,7 +282,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
throw new RuntimeException(message);
}
SettableFuture<Assignments> future0 = SettableFuture.create();
- final List<Assignment> assignmentList =
response.getAssignmentsList().stream().map(assignment ->
+ final List<Assignment> assignmentList =
resp.getAssignmentsList().stream().map(assignment ->
new Assignment(new
MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList());
final Assignments assignments = new Assignments(assignmentList);
future0.set(assignments);
@@ -398,7 +400,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
cacheAssignments.put(topic, latest);
return;
}
- LOGGER.info("Assignments of topic={} remains the same,
assignments={}, clientId={}", topic,
+ LOGGER.debug("Assignments of topic={} remains the
same, assignments={}, clientId={}", topic,
existed, clientId);
// Process queue may be dropped, need to be
synchronized anyway.
syncProcessQueue(topic, latest, filterExpression);
@@ -509,7 +511,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
.setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
}
- public ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
forwardMessageToDeadLetterQueue(
+ public
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
forwardMessageToDeadLetterQueue(
final MessageViewImpl messageView) {
// Intercept before forwarding message to DLQ.
final Stopwatch stopwatch = Stopwatch.createStarted();
@@ -517,7 +519,7 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
doBefore(MessageHookPoints.FORWARD_TO_DLQ, messageCommons);
final Endpoints endpoints = messageView.getEndpoints();
- ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future;
+
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
future;
try {
final ForwardMessageToDeadLetterQueueRequest request =
wrapForwardMessageToDeadLetterQueueRequest(messageView);
@@ -525,15 +527,14 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
future = clientManager.forwardMessageToDeadLetterQueue(endpoints,
metadata, request,
clientConfiguration.getRequestTimeout());
} catch (Throwable t) {
- final SettableFuture<ForwardMessageToDeadLetterQueueResponse>
future0 = SettableFuture.create();
- future0.setException(t);
- future = future0;
+ future = Futures.immediateFailedFuture(t);
}
- Futures.addCallback(future, new
FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
@Override
- public void onSuccess(ForwardMessageToDeadLetterQueueResponse
response) {
+ public void
onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
+ final ForwardMessageToDeadLetterQueueResponse resp =
context.getResp();
final Duration duration = stopwatch.elapsed();
- MessageHookPointsStatus messageHookPointsStatus =
Code.OK.equals(response.getStatus().getCode()) ?
+ MessageHookPointsStatus messageHookPointsStatus =
Code.OK.equals(resp.getStatus().getCode()) ?
MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
// Intercept after forwarding message to DLQ.
doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons,
duration, messageHookPointsStatus);
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 09b8d12..79d15a8 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -36,12 +36,14 @@ import org.apache.rocketmq.client.java.route.Endpoints;
public class ReceiveMessageResult {
private final Endpoints endpoints;
+ private final String requestId;
private final ClientException exception;
private final List<MessageViewImpl> messages;
- public ReceiveMessageResult(Endpoints endpoints, Status status,
List<MessageViewImpl> messages) {
+ public ReceiveMessageResult(Endpoints endpoints, String requestId, Status
status, List<MessageViewImpl> messages) {
this.endpoints = endpoints;
+ this.requestId = requestId;
final Code code = status.getCode();
switch (code) {
case OK:
@@ -108,4 +110,8 @@ public class ReceiveMessageResult {
public List<MessageViewImpl> getMessages() {
return messages;
}
+
+ public String getRequestId() {
+ return requestId;
+ }
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index b0ccfd9..77750e1 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -56,6 +56,7 @@ import
org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,9 +241,10 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
return future0;
}
MessageViewImpl impl = (MessageViewImpl) messageView;
- final ListenableFuture<AckMessageResponse> future = ackMessage(impl);
- return Futures.transformAsync(future, response -> {
- final Status status = response.getStatus();
+ final ListenableFuture<InvocationContext<AckMessageResponse>> future =
ackMessage(impl);
+ return Futures.transformAsync(future, context -> {
+ final AckMessageResponse resp = context.getResp();
+ final Status status = resp.getStatus();
final Code code = status.getCode();
switch (code) {
case OK:
@@ -300,12 +302,13 @@ class SimpleConsumerImpl extends ConsumerImpl implements
SimpleConsumer {
return future0;
}
MessageViewImpl impl = (MessageViewImpl) messageView;
- final ListenableFuture<ChangeInvisibleDurationResponse> future =
changeInvisibleDuration(impl,
- invisibleDuration);
- return Futures.transformAsync(future, response -> {
+ final
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
+ changeInvisibleDuration(impl, invisibleDuration);
+ return Futures.transformAsync(future, context -> {
+ final ChangeInvisibleDurationResponse resp = context.getResp();
// Refresh receipt handle manually.
- impl.setReceiptHandle(response.getReceiptHandle());
- final Status status = response.getStatus();
+ impl.setReceiptHandle(resp.getReceiptHandle());
+ final Status status = resp.getStatus();
final Code code = status.getCode();
switch (code) {
case OK:
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 6434cca..427d318 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -280,13 +281,14 @@ class ProducerImpl extends ClientImpl implements Producer
{
MessageHookPoints.COMMIT_TRANSACTION :
MessageHookPoints.ROLLBACK_TRANSACTION;
doBefore(messageHookPoints, messageCommons);
- final ListenableFuture<EndTransactionResponse> future =
+ final ListenableFuture<InvocationContext<EndTransactionResponse>>
future =
clientManager.endTransaction(endpoints, metadata, request,
requestTimeout);
- Futures.addCallback(future, new
FutureCallback<EndTransactionResponse>() {
+ Futures.addCallback(future, new
FutureCallback<InvocationContext<EndTransactionResponse>>() {
@Override
- public void onSuccess(EndTransactionResponse result) {
+ public void onSuccess(InvocationContext<EndTransactionResponse>
context) {
final Duration duration = stopwatch.elapsed();
- final Status status = result.getStatus();
+ final EndTransactionResponse resp = context.getResp();
+ final Status status = resp.getStatus();
final Code code = status.getCode();
MessageHookPointsStatus messageHookPointsStatus =
Code.OK.equals(code) ? MessageHookPointsStatus.OK :
MessageHookPointsStatus.ERROR;
@@ -299,8 +301,9 @@ class ProducerImpl extends ClientImpl implements Producer {
doAfter(messageHookPoints, messageCommons, duration,
MessageHookPointsStatus.ERROR);
}
}, MoreExecutors.directExecutor());
- final EndTransactionResponse response = handleClientFuture(future);
- final Status status = response.getStatus();
+ final InvocationContext<EndTransactionResponse> context =
handleClientFuture(future);
+ final EndTransactionResponse resp = context.getResp();
+ final Status status = resp.getStatus();
final Code code = status.getCode();
if (!Code.OK.equals(code)) {
throw new ClientException(code.getNumber(), status.getMessage());
@@ -442,13 +445,13 @@ class ProducerImpl extends ClientImpl implements Producer
{
final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
final SendMessageRequest request = wrapSendMessageRequest(messages);
- final ListenableFuture<SendMessageResponse> responseFuture =
clientManager.sendMessage(endpoints, metadata,
- request, clientConfiguration.getRequestTimeout());
+ final ListenableFuture<InvocationContext<SendMessageResponse>>
responseFuture =
+ clientManager.sendMessage(endpoints, metadata, request,
clientConfiguration.getRequestTimeout());
final ListenableFuture<List<SendReceiptImpl>> attemptFuture =
Futures.transformAsync(responseFuture,
response -> {
final SettableFuture<List<SendReceiptImpl>> future0 =
SettableFuture.create();
- future0.set(SendReceiptImpl.processSendResponse(messageQueue,
response));
+ future0.set(SendReceiptImpl.processSendResponse(messageQueue,
response.getResp()));
return future0;
}, MoreExecutors.directExecutor());
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
new file mode 100644
index 0000000..eb5f487
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.rpc;
+
+import com.google.common.base.MoreObjects;
+
+public class InvocationContext<T> {
+ private final T t;
+ private final RpcContext rpcContext;
+
+ public InvocationContext(T t, RpcContext rpcContext) {
+ this.t = t;
+ this.rpcContext = rpcContext;
+ }
+
+ public T getResp() {
+ return t;
+ }
+
+ public RpcContext getRpcContext() {
+ return rpcContext;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("resp", t)
+ .add("rpcContext", rpcContext)
+ .toString();
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 415d228..70744f3 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -72,9 +72,10 @@ public interface RpcClient {
* @param request query route request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of topic route.
+ * @return invocation of response future.
*/
- ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata,
QueryRouteRequest request, Executor executor,
+ ListenableFuture<InvocationContext<QueryRouteResponse>>
queryRoute(Metadata metadata, QueryRouteRequest request,
+ Executor executor,
Duration duration);
/**
@@ -84,9 +85,10 @@ public interface RpcClient {
* @param request heart beat request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of heart beat.
+ * @return invocation of response future.
*/
- ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata,
HeartbeatRequest request, Executor executor,
+ ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata
metadata, HeartbeatRequest request,
+ Executor executor,
Duration duration);
/**
@@ -96,10 +98,10 @@ public interface RpcClient {
* @param request send message request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of sending message.
+ * @return invocation of response future.
*/
- ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata,
SendMessageRequest request, Executor executor,
- Duration duration);
+ ListenableFuture<InvocationContext<SendMessageResponse>>
sendMessage(Metadata metadata,
+ SendMessageRequest request, Executor executor, Duration duration);
/**
* Query assignment asynchronously.
@@ -108,10 +110,10 @@ public interface RpcClient {
* @param request query assignment request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of query assignment.
+ * @return invocation of response future.
*/
- ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata
metadata, QueryAssignmentRequest request,
- Executor executor, Duration duration);
+ ListenableFuture<InvocationContext<QueryAssignmentResponse>>
queryAssignment(Metadata metadata,
+ QueryAssignmentRequest request, Executor executor, Duration duration);
/**
* Receiving message asynchronously from server.
@@ -119,9 +121,10 @@ public interface RpcClient {
* @param metadata gRPC request header metadata.
* @param request receiving message request.
* @param executor gRPC asynchronous executor.
+ * @return invocation of response future.
*/
- ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata
metadata, ReceiveMessageRequest request,
- ExecutorService executor, Duration duration);
+ ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>>
receiveMessage(Metadata metadata,
+ ReceiveMessageRequest request, ExecutorService executor, Duration
duration);
/**
* Ack message asynchronously after success of consumption.
@@ -130,10 +133,10 @@ public interface RpcClient {
* @param request ack message request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of ack message.
+ * @return invocation of response future.
*/
- ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata,
AckMessageRequest request, Executor executor,
- Duration duration);
+ ListenableFuture<InvocationContext<AckMessageResponse>>
ackMessage(Metadata metadata, AckMessageRequest request,
+ Executor executor, Duration duration);
/**
* Change message invisible duration.
@@ -142,9 +145,9 @@ public interface RpcClient {
* @param request change invisible duration request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of change message invisible duration.
+ * @return invocation of response future.
*/
- ListenableFuture<ChangeInvisibleDurationResponse>
changeInvisibleDuration(Metadata metadata,
+ ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
changeInvisibleDuration(Metadata metadata,
ChangeInvisibleDurationRequest request, Executor executor, Duration
duration);
/**
@@ -154,9 +157,9 @@ public interface RpcClient {
* @param request request of sending message to DLQ.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of sending message to DLQ.
+ * @return invocation of response future.
*/
- ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
forwardMessageToDeadLetterQueue(
+
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
forwardMessageToDeadLetterQueue(
Metadata metadata, ForwardMessageToDeadLetterQueueRequest request,
Executor executor, Duration duration);
/**
@@ -166,10 +169,10 @@ public interface RpcClient {
* @param request end transaction request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of submitting transaction resolution.
+ * @return invocation of response future.
*/
- ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata,
EndTransactionRequest request,
- Executor executor, Duration duration);
+ ListenableFuture<InvocationContext<EndTransactionResponse>>
endTransaction(Metadata metadata,
+ EndTransactionRequest request, Executor executor, Duration duration);
/**
* Asynchronously notify server that client is terminated.
@@ -178,9 +181,9 @@ public interface RpcClient {
* @param request notify client termination request.
* @param executor gRPC asynchronous executor.
* @param duration request max duration.
- * @return response future of notification of client termination.
+ * @return invocation of response future.
*/
- ListenableFuture<NotifyClientTerminationResponse>
notifyClientTermination(Metadata metadata,
+ ListenableFuture<InvocationContext<NotifyClientTerminationResponse>>
notifyClientTermination(Metadata metadata,
NotifyClientTerminationRequest request, Executor executor, Duration
duration);
StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor
executor, Duration duration,
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index 15b94aa..ffba65e 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -39,6 +39,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.SendMessageRequest;
import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.TelemetryCommand;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.ClientInterceptor;
@@ -62,10 +63,12 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import org.apache.rocketmq.client.java.route.Endpoints;
+@SuppressWarnings("UnstableApiUsage")
public class RpcClientImpl implements RpcClient {
private static final Duration KEEP_ALIVE_DURATION = Duration.ofSeconds(30);
private static final int GRPC_MAX_MESSAGE_SIZE = Integer.MAX_VALUE;
+ private final Endpoints endpoints;
private final ManagedChannel channel;
private final MessagingServiceGrpc.MessagingServiceFutureStub futureStub;
private final MessagingServiceGrpc.MessagingServiceBlockingStub
blockingStub;
@@ -75,6 +78,7 @@ public class RpcClientImpl implements RpcClient {
@SuppressWarnings("deprecation")
public RpcClientImpl(Endpoints endpoints) throws SSLException {
+ this.endpoints = endpoints;
final SslContextBuilder builder = GrpcSslContexts.forClient();
builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
SslContext sslContext = builder.build();
@@ -102,6 +106,14 @@ public class RpcClientImpl implements RpcClient {
this.activityNanoTime = System.nanoTime();
}
+ private <T> ListenableFuture<InvocationContext<T>>
wrapInvocationContext(ListenableFuture<T> future,
+ Metadata header) {
+ return Futures.transformAsync(future, response -> {
+ final RpcContext rpcContext = new RpcContext(endpoints, header);
+ return Futures.immediateFuture(new InvocationContext<>(response,
rpcContext));
+ }, MoreExecutors.directExecutor());
+ }
+
@Override
public Duration idleDuration() {
return Duration.ofNanos(System.nanoTime() - activityNanoTime);
@@ -113,85 +125,105 @@ public class RpcClientImpl implements RpcClient {
}
@Override
- public ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata,
QueryRouteRequest request,
- Executor executor, Duration duration) {
+ public ListenableFuture<InvocationContext<QueryRouteResponse>>
queryRoute(Metadata metadata,
+ QueryRouteRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ final ListenableFuture<QueryRouteResponse> future = futureStub
+
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
.withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).queryRoute(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata,
HeartbeatRequest request, Executor executor,
- Duration duration) {
+ public ListenableFuture<InvocationContext<HeartbeatResponse>>
heartbeat(Metadata metadata, HeartbeatRequest request,
+ Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).heartbeat(request);
+ final ListenableFuture<HeartbeatResponse> future =
+
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).heartbeat(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<SendMessageResponse> sendMessage(Metadata
metadata, SendMessageRequest request,
- Executor executor, Duration duration) {
+ public ListenableFuture<InvocationContext<SendMessageResponse>>
sendMessage(Metadata metadata,
+ SendMessageRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).sendMessage(request);
+ final ListenableFuture<SendMessageResponse> future =
+
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).sendMessage(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata
metadata, QueryAssignmentRequest request,
- Executor executor, Duration duration) {
+ public ListenableFuture<InvocationContext<QueryAssignmentResponse>>
queryAssignment(Metadata metadata,
+ QueryAssignmentRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).queryAssignment(request);
+ final ListenableFuture<QueryAssignmentResponse> future =
+
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).queryAssignment(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<Iterator<ReceiveMessageResponse>>
receiveMessage(Metadata metadata,
+ public
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>>
receiveMessage(Metadata metadata,
ReceiveMessageRequest request, ExecutorService executor, Duration
duration) {
this.activityNanoTime = System.nanoTime();
final Callable<Iterator<ReceiveMessageResponse>> callable = () ->
blockingStub
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
.withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).receiveMessage(request);
- return MoreExecutors.listeningDecorator(executor).submit(callable);
+ final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+ MoreExecutors.listeningDecorator(executor).submit(callable);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata,
AckMessageRequest request,
- Executor executor, Duration duration) {
+ public ListenableFuture<InvocationContext<AckMessageResponse>>
ackMessage(Metadata metadata,
+ AckMessageRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).ackMessage(request);
+ final ListenableFuture<AckMessageResponse> future =
+
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).ackMessage(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<ChangeInvisibleDurationResponse>
changeInvisibleDuration(Metadata metadata,
- ChangeInvisibleDurationRequest request, Executor executor, Duration
duration) {
+ public
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
changeInvisibleDuration(
+ Metadata metadata, ChangeInvisibleDurationRequest request, Executor
executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
+ final ListenableFuture<ChangeInvisibleDurationResponse> future =
+
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
forwardMessageToDeadLetterQueue(
+ public
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
forwardMessageToDeadLetterQueue(
Metadata metadata, ForwardMessageToDeadLetterQueueRequest request,
Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future
= futureStub
+
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
.withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).forwardMessageToDeadLetterQueue(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<EndTransactionResponse> endTransaction(Metadata
metadata, EndTransactionRequest request,
- Executor executor, Duration duration) {
+ public ListenableFuture<InvocationContext<EndTransactionResponse>>
endTransaction(Metadata metadata,
+ EndTransactionRequest request, Executor executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).endTransaction(request);
+ final ListenableFuture<EndTransactionResponse> future =
+
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).endTransaction(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
- public ListenableFuture<NotifyClientTerminationResponse>
notifyClientTermination(
+ public
ListenableFuture<InvocationContext<NotifyClientTerminationResponse>>
notifyClientTermination(
Metadata metadata, NotifyClientTerminationRequest request, Executor
executor, Duration duration) {
this.activityNanoTime = System.nanoTime();
- return
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
- .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).notifyClientTermination(request);
+ final ListenableFuture<NotifyClientTerminationResponse> future =
+
futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+ .withDeadlineAfter(duration.toNanos(),
TimeUnit.NANOSECONDS).notifyClientTermination(request);
+ return wrapInvocationContext(future, metadata);
}
@Override
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
new file mode 100644
index 0000000..5def58b
--- /dev/null
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.rpc;
+
+import io.grpc.Metadata;
+import org.apache.rocketmq.client.java.route.Endpoints;
+
+public class RpcContext {
+ private final Endpoints endpoints;
+ private final Metadata header;
+
+ public RpcContext(Endpoints endpoints, Metadata header) {
+ this.endpoints = endpoints;
+ this.header = header;
+ }
+
+ public String getRequestId() {
+ return header.get(Metadata.Key.of(Signature.REQUEST_ID_KEY,
Metadata.ASCII_STRING_MARSHALLER));
+ }
+
+ public Endpoints getEndpoints() {
+ return endpoints;
+ }
+}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
index 33b9f76..89de823 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
@@ -18,7 +18,6 @@
package org.apache.rocketmq.client.java.rpc;
import io.grpc.Metadata;
-import java.io.UnsupportedEncodingException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.text.SimpleDateFormat;
@@ -53,7 +52,7 @@ public class Signature {
private Signature() {
}
- public static Metadata sign(ClientConfiguration config, String clientId)
throws UnsupportedEncodingException,
+ public static Metadata sign(ClientConfiguration config, String clientId)
throws
NoSuchAlgorithmException, InvalidKeyException {
Metadata metadata = new Metadata();
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
index ca2f720..d70b4cd 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.java.rpc;
-import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
@@ -31,11 +30,10 @@ public class TLSHelper {
private TLSHelper() {
}
- public static String sign(String accessSecret, String dateTime) throws
UnsupportedEncodingException,
- NoSuchAlgorithmException,
- InvalidKeyException {
+ public static String sign(String accessSecret, String dateTime) throws
NoSuchAlgorithmException,
+ InvalidKeyException {
SecretKeySpec signingKey = new
SecretKeySpec(accessSecret.getBytes(StandardCharsets.UTF_8),
- HMAC_SHA1_ALGORITHM);
+ HMAC_SHA1_ALGORITHM);
Mac mac;
mac = Mac.getInstance(HMAC_SHA1_ALGORITHM);
mac.init(signingKey);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 401ce38..5ef44bc 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -44,8 +44,10 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.RequestIdGenerator;
import org.apache.rocketmq.client.java.retry.RetryPolicy;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.Before;
import org.junit.Test;
@@ -117,7 +119,7 @@ public class ProcessQueueImplTest extends TestBase {
when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
when(pushConsumerSettings.isFifo()).thenReturn(false);
when(pushConsumer.changeInvisibleDuration(any(MessageViewImpl.class),
any(Duration.class)))
- .thenReturn(okChangeInvisibleDurationFuture());
+ .thenReturn(okChangeInvisibleDurationCtxFuture());
processQueue.cacheMessages(messageViewList);
verify(pushConsumer, times(1))
.changeInvisibleDuration(any(MessageViewImpl.class),
any(Duration.class));
@@ -148,7 +150,8 @@ public class ProcessQueueImplTest extends TestBase {
List<MessageViewImpl> messageViewList = new ArrayList<>();
final MessageViewImpl messageView = fakeMessageViewImpl();
messageViewList.add(messageView);
- ReceiveMessageResult receiveMessageResult = new
ReceiveMessageResult(fakeEndpoints(), status, messageViewList);
+ ReceiveMessageResult receiveMessageResult = new
ReceiveMessageResult(fakeEndpoints(),
+ RequestIdGenerator.getInstance().next(), status, messageViewList);
SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
future0.set(receiveMessageResult);
when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class),
any(MessageQueueImpl.class),
@@ -178,7 +181,7 @@ public class ProcessQueueImplTest extends TestBase {
assertEquals(cachedMessageCount, processQueue.cachedMessagesCount());
assertEquals(1, processQueue.inflightMessagesCount());
- final ListenableFuture<AckMessageResponse> future =
okAckMessageResponseFuture();
+ final ListenableFuture<InvocationContext<AckMessageResponse>> future =
okAckMessageResponseFuture();
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
processQueue.eraseMessage(optionalMessageView.get(),
ConsumeResult.SUCCESS);
future.addListener(() -> verify(pushConsumer, times(1))
@@ -221,7 +224,7 @@ public class ProcessQueueImplTest extends TestBase {
final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
messageViewList.add(messageView);
processQueue.cacheMessages(messageViewList);
- ListenableFuture<AckMessageResponse> future0 =
okAckMessageResponseFuture();
+ ListenableFuture<InvocationContext<AckMessageResponse>> future0 =
okAckMessageResponseFuture();
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
when(retryPolicy.getMaxAttempts()).thenReturn(1);
@@ -237,7 +240,7 @@ public class ProcessQueueImplTest extends TestBase {
final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
messageViewList.add(messageView);
processQueue.cacheMessages(messageViewList);
- ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 =
+
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
future0 =
okForwardMessageToDeadLetterQueueResponseFuture();
when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
@@ -254,7 +257,7 @@ public class ProcessQueueImplTest extends TestBase {
final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
messageViewList.add(messageView);
processQueue.cacheMessages(messageViewList);
- ListenableFuture<AckMessageResponse> future0 =
okAckMessageResponseFuture();
+ ListenableFuture<InvocationContext<AckMessageResponse>> future0 =
okAckMessageResponseFuture();
when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
when(retryPolicy.getMaxAttempts()).thenReturn(2);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index 54e507c..a7b5357 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -124,7 +124,8 @@ public class PushConsumerImplTest extends TestBase {
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
any(Metadata.class), any(QueryAssignmentRequest.class),
any(Duration.class));
-
Assert.assertEquals(okQueryAssignmentResponseFuture().get().getAssignmentsCount(),
pushConsumer.getQueueSize());
+
Assert.assertEquals(okQueryAssignmentResponseFuture().get().getResp().getAssignmentsCount(),
+ pushConsumer.getQueueSize());
when(clientManager.queryAssignment(any(Endpoints.class),
any(Metadata.class), any(QueryAssignmentRequest.class),
any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
pushConsumer.scanAssignments();
@@ -148,7 +149,8 @@ public class PushConsumerImplTest extends TestBase {
@Test
public void testSubscribeWithSubscriptionOverwriting() throws
ClientException {
- pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0,
subscriptionExpressions, messageListener,
+ pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0,
subscriptionExpressions,
+ messageListener,
maxCacheMessageCount, maxCacheMessageSizeInBytes,
consumptionThreadCount);
start(pushConsumer);
final FilterExpression filterExpression = new
FilterExpression(FAKE_TAG_0);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 9568a81..88445d1 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.client.java.impl.TelemetrySession;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -87,7 +88,7 @@ public class SimpleConsumerImplTest extends TestBase {
private SimpleConsumerImpl simpleConsumer;
private void start(SimpleConsumerImpl simpleConsumer) throws
ClientException {
- SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
+ SettableFuture<InvocationContext<QueryRouteResponse>> future0 =
SettableFuture.create();
Status status = Status.newBuilder().setCode(Code.OK).build();
List<MessageQueue> messageQueueList = new ArrayList<>();
MessageQueue mq =
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
@@ -97,7 +98,9 @@ public class SimpleConsumerImplTest extends TestBase {
messageQueueList.add(mq);
QueryRouteResponse response =
QueryRouteResponse.newBuilder().setStatus(status)
.addAllMessageQueues(messageQueueList).build();
- future0.set(response);
+ final InvocationContext<QueryRouteResponse> invocationContext = new
InvocationContext<>(response,
+ fakeRpcContext());
+ future0.set(invocationContext);
when(clientManager.queryRoute(any(Endpoints.class),
any(Metadata.class), any(QueryRouteRequest.class),
any(Duration.class)))
.thenReturn(future0);
@@ -180,7 +183,7 @@ public class SimpleConsumerImplTest extends TestBase {
simpleConsumer = new SimpleConsumerImpl(clientConfiguration,
FAKE_GROUP_0, awaitDuration, subExpressions);
start(simpleConsumer);
int receivedMessageCount = 16;
- final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+ final
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
okReceiveMessageResponsesFuture(FAKE_TOPIC_0,
receivedMessageCount);
when(clientManager.receiveMessage(any(Endpoints.class),
any(Metadata.class), any(ReceiveMessageRequest.class),
any(Duration.class))).thenReturn(future);
@@ -197,7 +200,7 @@ public class SimpleConsumerImplTest extends TestBase {
start(simpleConsumer);
try {
final MessageViewImpl messageView = fakeMessageViewImpl();
- final ListenableFuture<AckMessageResponse> future =
okAckMessageResponseFuture();
+ final ListenableFuture<InvocationContext<AckMessageResponse>>
future = okAckMessageResponseFuture();
when(clientManager.ackMessage(any(Endpoints.class),
any(Metadata.class), any(AckMessageRequest.class),
any(Duration.class))).thenReturn(future);
simpleConsumer.ack(messageView);
@@ -212,7 +215,7 @@ public class SimpleConsumerImplTest extends TestBase {
start(simpleConsumer);
try {
final MessageViewImpl messageView = fakeMessageViewImpl();
- final ListenableFuture<ChangeInvisibleDurationResponse> future =
+ final
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
when(clientManager.changeInvisibleDuration(any(Endpoints.class),
any(Metadata.class),
any(ChangeInvisibleDurationRequest.class),
any(Duration.class))).thenReturn(future);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 907a6fc..619538e 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -61,6 +61,7 @@ import
org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
import org.apache.rocketmq.client.java.impl.TelemetrySession;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.apache.rocketmq.client.java.tool.TestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -94,7 +95,7 @@ public class ProducerImplTest extends TestBase {
null);
private void start(ProducerImpl producer) throws ClientException {
- SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
+ SettableFuture<InvocationContext<QueryRouteResponse>> future0 =
SettableFuture.create();
Status status = Status.newBuilder().setCode(Code.OK).build();
List<MessageQueue> messageQueueList = new ArrayList<>();
MessageQueue mq =
MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
@@ -102,9 +103,11 @@ public class ProducerImplTest extends TestBase {
.setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
.setId(0).build();
messageQueueList.add(mq);
- QueryRouteResponse response =
QueryRouteResponse.newBuilder().setStatus(status)
+ QueryRouteResponse resp =
QueryRouteResponse.newBuilder().setStatus(status)
.addAllMessageQueues(messageQueueList).build();
- future0.set(response);
+ final InvocationContext<QueryRouteResponse> invocationContext =
+ new InvocationContext<>(resp, fakeRpcContext());
+ future0.set(invocationContext);
when(clientManager.queryRoute(any(Endpoints.class),
any(Metadata.class), any(QueryRouteRequest.class),
any(Duration.class)))
.thenReturn(future0);
@@ -144,10 +147,11 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class),
any(Duration.class), any(TelemetrySession.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
- final ListenableFuture<SendMessageResponse> future =
okSendMessageResponseFutureWithSingleEntry();
+ final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+ okSendMessageResponseFutureWithSingleEntry();
when(clientManager.sendMessage(any(Endpoints.class),
any(Metadata.class), any(SendMessageRequest.class),
any(Duration.class))).thenReturn(future);
- final SendMessageResponse response = future.get();
+ final SendMessageResponse response = future.get().getResp();
assertEquals(1, response.getEntriesCount());
final apache.rocketmq.v2.SendResultEntry receipt =
response.getEntriesList().iterator().next();
final SendReceipt sendReceipt = producer.send(message);
@@ -163,10 +167,11 @@ public class ProducerImplTest extends TestBase {
verify(clientManager, never()).telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
any(TelemetrySession.class));
final Message message = fakeMessage(FAKE_TOPIC_0);
- final ListenableFuture<SendMessageResponse> future =
okSendMessageResponseFutureWithSingleEntry();
+ final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+ okSendMessageResponseFutureWithSingleEntry();
when(clientManager.sendMessage(any(Endpoints.class),
any(Metadata.class), any(SendMessageRequest.class),
any(Duration.class))).thenReturn(future);
- final SendMessageResponse response = future.get();
+ final SendMessageResponse response = future.get().getResp();
assertEquals(1, response.getEntriesCount());
final SendReceipt sendReceipt =
producerWithoutTopicBinding.send(message);
verify(clientManager, times(1)).queryRoute(any(Endpoints.class),
any(Metadata.class),
@@ -185,7 +190,7 @@ public class ProducerImplTest extends TestBase {
any(QueryRouteRequest.class), any(Duration.class));
verify(clientManager, times(1)).telemetry(any(Endpoints.class),
any(Metadata.class), any(Duration.class),
any(TelemetrySession.class));
- final ListenableFuture<SendMessageResponse> future =
failureSendMessageResponseFuture();
+ final ListenableFuture<InvocationContext<SendMessageResponse>> future
= failureSendMessageResponseFuture();
when(clientManager.sendMessage(any(Endpoints.class),
any(Metadata.class), any(SendMessageRequest.class),
any(Duration.class))).thenReturn(future);
Message message0 = fakeMessage(FAKE_TOPIC_0);
diff --git
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 212cb06..f3bcbb2 100644
---
a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++
b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -39,9 +39,11 @@ import apache.rocketmq.v2.SendMessageResponse;
import apache.rocketmq.v2.SendResultEntry;
import apache.rocketmq.v2.Status;
import apache.rocketmq.v2.SystemProperties;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
+import io.grpc.Metadata;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
@@ -72,6 +74,8 @@ import
org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcContext;
public class TestBase {
protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -137,6 +141,10 @@ public class TestBase {
return new Endpoints(fakePbEndpoints0());
}
+ protected RpcContext fakeRpcContext() {
+ return new RpcContext(fakeEndpoints(), new Metadata());
+ }
+
protected Message fakeMessage(String topic) {
return new
MessageBuilderImpl().setTopic(topic).setBody(RandomUtils.nextBytes(1)).build();
}
@@ -207,40 +215,35 @@ public class TestBase {
.setPermission(Permission.READ_WRITE).build();
}
- protected ListenableFuture<QueryRouteResponse>
okQueryRouteResponseFuture() {
- SettableFuture<QueryRouteResponse> future = SettableFuture.create();
+ protected ListenableFuture<InvocationContext<QueryRouteResponse>>
okQueryRouteResponseFuture() {
Status status = Status.newBuilder().setCode(Code.OK).build();
- final QueryRouteResponse response =
+ final QueryRouteResponse resp =
QueryRouteResponse.newBuilder().setStatus(status).addMessageQueues(fakePbMessageQueue0()).build();
- future.set(response);
- return future;
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
- protected ListenableFuture<ChangeInvisibleDurationResponse>
okChangeInvisibleDurationFuture() {
- SettableFuture<ChangeInvisibleDurationResponse> future =
SettableFuture.create();
+ protected
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
+ okChangeInvisibleDurationCtxFuture() {
Status status = Status.newBuilder().setCode(Code.OK).build();
- final ChangeInvisibleDurationResponse response =
+ final ChangeInvisibleDurationResponse resp =
ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
- future.set(response);
- return future;
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
- protected ListenableFuture<QueryAssignmentResponse>
okQueryAssignmentResponseFuture() {
+ protected ListenableFuture<InvocationContext<QueryAssignmentResponse>>
okQueryAssignmentResponseFuture() {
final SettableFuture<QueryAssignmentResponse> future =
SettableFuture.create();
final Status status = Status.newBuilder().setCode(Code.OK).build();
Assignment assignment =
Assignment.newBuilder().setMessageQueue(fakePbMessageQueue0()).build();
- QueryAssignmentResponse response =
QueryAssignmentResponse.newBuilder().setStatus(status)
+ QueryAssignmentResponse resp =
QueryAssignmentResponse.newBuilder().setStatus(status)
.addAssignments(assignment).build();
- future.set(response);
- return future;
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
- protected ListenableFuture<QueryAssignmentResponse>
okEmptyQueryAssignmentResponseFuture() {
+ protected ListenableFuture<InvocationContext<QueryAssignmentResponse>>
okEmptyQueryAssignmentResponseFuture() {
final SettableFuture<QueryAssignmentResponse> future =
SettableFuture.create();
final Status status = Status.newBuilder().setCode(Code.OK).build();
- final QueryAssignmentResponse response =
QueryAssignmentResponse.newBuilder().setStatus(status).build();
- future.set(response);
- return future;
+ final QueryAssignmentResponse resp =
QueryAssignmentResponse.newBuilder().setStatus(status).build();
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
protected Map<String, FilterExpression>
createSubscriptionExpressions(String topic) {
@@ -250,53 +253,44 @@ public class TestBase {
return map;
}
- protected ListenableFuture<AckMessageResponse>
okAckMessageResponseFuture() {
+ protected ListenableFuture<InvocationContext<AckMessageResponse>>
okAckMessageResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
- SettableFuture<AckMessageResponse> future0 = SettableFuture.create();
- final AckMessageResponse response =
AckMessageResponse.newBuilder().setStatus(status).build();
- future0.set(response);
- return future0;
+ final AckMessageResponse resp =
AckMessageResponse.newBuilder().setStatus(status).build();
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
- protected ListenableFuture<ChangeInvisibleDurationResponse>
okChangeInvisibleDurationResponseFuture(
- String receiptHandle) {
+ protected
ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
+ okChangeInvisibleDurationResponseFuture(String receiptHandle) {
final Status status = Status.newBuilder().setCode(Code.OK).build();
SettableFuture<ChangeInvisibleDurationResponse> future =
SettableFuture.create();
- ChangeInvisibleDurationResponse response =
ChangeInvisibleDurationResponse.newBuilder().setStatus(status)
+ ChangeInvisibleDurationResponse resp =
ChangeInvisibleDurationResponse.newBuilder().setStatus(status)
.setReceiptHandle(receiptHandle).build();
- future.set(response);
- return future;
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
- protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
- okForwardMessageToDeadLetterQueueResponseFuture() {
+ protected
ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
+ okForwardMessageToDeadLetterQueueResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
- SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 =
SettableFuture.create();
- final ForwardMessageToDeadLetterQueueResponse response =
+ final ForwardMessageToDeadLetterQueueResponse resp =
ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
- future0.set(response);
- return future0;
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
- protected ListenableFuture<SendMessageResponse>
okSendMessageResponseFutureWithSingleEntry() {
+ protected ListenableFuture<InvocationContext<SendMessageResponse>>
okSendMessageResponseFutureWithSingleEntry() {
final Status status = Status.newBuilder().setCode(Code.OK).build();
- SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
final String messageId =
MessageIdCodec.getInstance().nextMessageId().toString();
SendResultEntry entry =
SendResultEntry.newBuilder().setMessageId(messageId)
.setTransactionId(FAKE_TRANSACTION_ID).setStatus(status).setOffset(1).build();
- SendMessageResponse response =
SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
- future0.set(response);
- return future0;
+ SendMessageResponse resp =
SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
- protected ListenableFuture<SendMessageResponse>
failureSendMessageResponseFuture() {
+ protected ListenableFuture<InvocationContext<SendMessageResponse>>
failureSendMessageResponseFuture() {
final Status status =
Status.newBuilder().setCode(Code.FORBIDDEN).build();
- SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
SendResultEntry sendResultEntry =
SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
- SendMessageResponse response =
SendMessageResponse.newBuilder().setStatus(status)
+ SendMessageResponse resp =
SendMessageResponse.newBuilder().setStatus(status)
.addEntries(sendResultEntry).build();
- future0.set(response);
- return future0;
+ return Futures.immediateFuture(new InvocationContext<>(resp,
fakeRpcContext()));
}
protected ListenableFuture<SendMessageResponse>
okBatchSendMessageResponseFuture() {
@@ -326,11 +320,9 @@ public class TestBase {
.setSystemProperties(systemProperties).build();
}
- protected ListenableFuture<Iterator<ReceiveMessageResponse>>
okReceiveMessageResponsesFuture(String topic,
- int messageCount) {
+ protected
ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>>
okReceiveMessageResponsesFuture(
+ String topic, int messageCount) {
final Status status = Status.newBuilder().setCode(Code.OK).build();
- SettableFuture<Iterator<ReceiveMessageResponse>> future =
SettableFuture.create();
-
final apache.rocketmq.v2.Message message = fakePbMessage(topic);
List<ReceiveMessageResponse> responses = new ArrayList<>();
ReceiveMessageResponse statusResponse =
ReceiveMessageResponse.newBuilder().setStatus(status).build();
@@ -339,9 +331,7 @@ public class TestBase {
ReceiveMessageResponse messageResponse =
ReceiveMessageResponse.newBuilder().setMessage(message).build();
responses.add(messageResponse);
}
-
- future.set(responses.iterator());
- return future;
+ return Futures.immediateFuture(new
InvocationContext<>(responses.iterator(), fakeRpcContext()));
}
protected ListenableFuture<EndTransactionResponse>
okEndTransactionResponseFuture() {
@@ -369,8 +359,9 @@ public class TestBase {
protected SendReceiptImpl fakeSendReceiptImpl(
MessageQueueImpl mq) throws ExecutionException, InterruptedException,
ClientException {
- final ListenableFuture<SendMessageResponse> future =
okSendMessageResponseFutureWithSingleEntry();
- final SendMessageResponse response = future.get();
+ final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+ okSendMessageResponseFutureWithSingleEntry();
+ final SendMessageResponse response = future.get().getResp();
final List<SendReceiptImpl> receipts =
SendReceiptImpl.processSendResponse(mq, response);
return receipts.iterator().next();
}