GitHub user Breathtak created a discussion: Some Quest for RPC Demo Test
Location: rpc folder under the rocketmq-example directory
When I execute the RequestProducer main method, this method throws an
exception,exception as follows:
> org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006
> DESC: send request message to <RequestTopic> OK, but wait reply message
> timeout, 3000 ms.
> at
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762)
> at
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633)
> at
> org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810)
> at
> org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47)
Then I modified the TTL, but the effect was the same,So I added a few lines of
logs as follows:
> request create RequestResponseFuture and correlationId :
> 104fa2e7-468b-4eb8-a932-a7214f2c1c08
> request remove RequestResponseFuture
> org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006
> DESC: send request message to <RequestTopic> OK, but wait reply message
> timeout, 3000 ms.
> at
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762)
> at
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633)
> at
> org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810)
> at
> org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47)
> processReplyMessage set Reply correlationId:
> 104fa2e7-468b-4eb8-a932-a7214f2c1c08
Found that the problem is “requestResponseFuture acquireCountDownLatch ()”
This leads to the cancellation of the blocking wait,When processReplyMessage is
executed, RequestResponseFuture has already been removed
> @Override
> public void onSuccess(SendResult sendResult) {
> requestResponseFuture.setSendRequestOk(true);
> requestResponseFuture.acquireCountDownLatch();
> }
So I will requestResponseFuture acquireCountDownLatch () to delete, according
to the TTL datagrams to wait, return the result as follows:
> request create RequestResponseFuture and correlationId :
> a97c46d2-dabc-4345-afa9-5af1f5f46ed3
> processReplyMessage set Reply correlationId:
> a97c46d2-dabc-4345-afa9-5af1f5f46ed3
> request remove RequestResponseFuture
> request message: reply message contents.
> request to <RequestTopic> cost: 1017 replyMessage: MessageExt
> [brokerName=null, queueId=0, storeSize=0, queueOffset=0, sysFlag=0,
> bornTimestamp=1754470419554, bornHost=/221.6.33.116:43645,
> storeTimestamp=1754470421504, storeHost=/129.211.50.247:10911, msgId=null,
> commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0,
> toString()=Message{topic='DefaultCluster_REPLY_TOPIC', flag=0,
> properties={ARRIVE_TIME=1754470419570, PUSH_REPLY_TIME=1754470421504,
> MSG_REGION=DefaultRegion, UNIQ_KEY=C0A81E111A8063947C6B1D5FD4620007,
> CORRELATION_ID=a97c46d2-dabc-4345-afa9-5af1f5f46ed3, MSG_TYPE=reply,
> TTL=3000, REPLY_TO_CLIENT=192.168.30.17@24756#779130303594900,
> TRACE_ON=true}, body=[114, 101, 112, 108, 121, 32, 109, 101, 115, 115, 97,
> 103, 101, 32, 99, 111, 110, 116, 101, 110, 116, 115, 46],
> transactionId='null'}]
Question:requestResponseFuture.acquireCountDownLatch() Is there any problem
with the execution
GitHub link: https://github.com/apache/rocketmq/discussions/9600
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]