AlbumenJ commented on issue #13088:
URL: https://github.com/apache/dubbo/issues/13088#issuecomment-1756613966

   > > > 个人认为,这个和dubbo的消费端线程模型及dubbo consumer请求超时检查机制有关。 dubbo 
consumer使用HashedWheelTimer来检查请求是否超时。 dubbo的消费端线程模型如下: 
![屏幕截图](https://user-images.githubusercontent.com/29769524/273807829-ec4c28e7-8b3f-48c1-83c3-ff33f2aa47ba.png)
 Biz Thread在第二步被唤醒有两种场景: 1.HashedWheelTimer检测到请求超时。 
2.provider返回了请求结果并被consumer正确处理。 我们先看第一种场景。 
在第一步发送请求给provider之前,dubbo会创建一个TimeoutCheckTask来检查请求超时情况。然后将请求发送给provider。代码如下:
   > > > ```java
   > > >  @Override
   > > > public CompletableFuture<Object> request(Object request, int timeout, 
ExecutorService executor) throws RemotingException {
   > > >         if (closed) {
   > > >             throw new RemotingException(this.getLocalAddress(), null, 
"Failed to send request " + request + ", cause: The channel " + this + " is 
closed!");
   > > >         }
   > > >         Request req;
   > > >         if (request instanceof Request) {
   > > >             req = (Request) request;
   > > >         } else {
   > > >             // create request.
   > > >             req = new Request();
   > > >             req.setVersion(Version.getProtocolVersion());
   > > >             req.setTwoWay(true);
   > > >             req.setData(request);
   > > >         }
   > > >         DefaultFuture future = DefaultFuture.newFuture(channel, req, 
timeout, executor);
   > > >         try {
   > > >             channel.send(req);
   > > >         } catch (RemotingException e) {
   > > >             future.cancel();
   > > >             throw e;
   > > >         }
   > > >         return future;
   > > >     }
   > > > ```
   > > > 
   > > > 
   > > >     
   > > >       
   > > >     
   > > > 
   > > >       
   > > >     
   > > > 
   > > >     
   > > >   
   > > > 在 DefaultFuture future = DefaultFuture.newFuture(channel, req, 
timeout, executor)这一步中创建了一个TimeoutCheckTask,并将其提交给HashedWheelTimer,代码如下:
   > > > ```java
   > > >     public static DefaultFuture newFuture(Channel channel, Request 
request, int timeout, ExecutorService executor) {
   > > >         final DefaultFuture future = new DefaultFuture(channel, 
request, timeout);
   > > >         future.setExecutor(executor);
   > > >         // timeout check
   > > >         timeoutCheck(future);
   > > >         return future;
   > > >     }
   > > > ```
   > > > 
   > > > 
   > > >     
   > > >       
   > > >     
   > > > 
   > > >       
   > > >     
   > > > 
   > > >     
   > > >   
   > > > ```java
   > > >     /**
   > > >      * check time out of the future
   > > >      */
   > > >     private static void timeoutCheck(DefaultFuture future) {
   > > >         TimeoutCheckTask task = new TimeoutCheckTask(future.getId());
   > > >         future.timeoutCheckTask = 
TIME_OUT_TIMER.get().newTimeout(task, future.getTimeout(), 
TimeUnit.MILLISECONDS);
   > > >     }
   > > > ```
   > > > 
   > > > 
   > > >     
   > > >       
   > > >     
   > > > 
   > > >       
   > > >     
   > > > 
   > > >     
   > > >   
   > > > 其中newTimeout方法体如下:
   > > > ```java
   > > >     @Override
   > > >     public Timeout newTimeout(TimerTask task, long delay, TimeUnit 
unit) {
   > > >         if (task == null) {
   > > >             throw new NullPointerException("task");
   > > >         }
   > > >         if (unit == null) {
   > > >             throw new NullPointerException("unit");
   > > >         }
   > > > 
   > > >         long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
   > > > 
   > > >         if (maxPendingTimeouts > 0 && pendingTimeoutsCount > 
maxPendingTimeouts) {
   > > >             pendingTimeouts.decrementAndGet();
   > > >             throw new RejectedExecutionException("Number of pending 
timeouts ("
   > > >                 + pendingTimeoutsCount + ") is greater than or equal 
to maximum allowed pending "
   > > >                 + "timeouts (" + maxPendingTimeouts + ")");
   > > >         }
   > > > 
   > > >         start();
   > > > 
   > > >         // Add the timeout to the timeout queue which will be 
processed on the next tick.
   > > >         // During processing all the queued HashedWheelTimeouts will 
be added to the correct HashedWheelBucket.
   > > >         long deadline = System.nanoTime() + unit.toNanos(delay) - 
startTime;
   > > > 
   > > >         // Guard against overflow.
   > > >         if (delay > 0 && deadline < 0) {
   > > >             deadline = Long.MAX_VALUE;
   > > >         }
   > > >         HashedWheelTimeout timeout = new HashedWheelTimeout(this, 
task, deadline);
   > > >         timeouts.add(timeout);
   > > >         return timeout;
   > > >     }
   > > > ```
   > > > 
   > > > 
   > > >     
   > > >       
   > > >     
   > > > 
   > > >       
   > > >     
   > > > 
   > > >     
   > > >   
   > > > 假设在 start(); 和 long deadline = System.nanoTime() + unit.toNanos(delay) 
- startTime;这两句代码中间发生了时钟回拨,导致deadline<0 ,那么TimeoutCheckTask将永远无法执行本次请求超时检查任务。 
换句话说,Biz Thread被唤醒的场景中第一种场景无法达成了。
   > > > 接下来看看第二种场景: 让我们重新看下消费端线程模型: 
![屏幕截图](https://user-images.githubusercontent.com/29769524/273807829-ec4c28e7-8b3f-48c1-83c3-ff33f2aa47ba.png)
   > > > 当Biz 
Thread在第二步等待时,假设provider已经收到请求并开始进行业务逻辑处理,但是在进行业务处理时宕机了,那么消费方将无法收到来着provider的处理结果。Biz
 Thread被唤醒的第二种场景也无法达成了。 Biz Thread无人唤醒了,卡住了。
   > > 
   > > 
   > > 这个的根本原因其实是 3.2 早期的版本 ThreadlessExecutor 可能存在无限期的等待导致的。 按正常 AsyncToSync 
的等待过程中通过 future 的超时是会自动结束的,但是 ThreadlessExecutor 由于一次优化导致可能 executor 
不会结束,最终超时不生效了。
   > 
   > 现在版本是 3.2.0 我可以在一个注册中心下 一部分 升级到3.2.6版本吗 有没有隐患 还是必须所有的同时升级到3.2.6版本
   
   分批升级即可


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to