[ 
https://issues.apache.org/jira/browse/GIRAPH-1137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15934816#comment-15934816
 ] 

ASF GitHub Bot commented on GIRAPH-1137:
----------------------------------------

Github user majakabiljo commented on a diff in the pull request:

    https://github.com/apache/giraph/pull/26#discussion_r107200544
  
    --- Diff: 
giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
 ---
    @@ -215,10 +231,38 @@ public void run() {
             }
           }
         });
    -    thread.setUncaughtExceptionHandler(exceptionHandler);
    -    thread.setName("resume-sender");
    -    thread.setDaemon(true);
    -    thread.start();
    +    resumeHandlerThread.setUncaughtExceptionHandler(exceptionHandler);
    +    resumeHandlerThread.setName("resume-sender");
    +    resumeHandlerThread.setDaemon(true);
    +    resumeHandlerThread.start();
    +
    +    // Thread to handle/send cached requests
    +    Thread cachedRequestHandlerThread = new Thread(new Runnable() {
    +      @Override
    +      public void run() {
    +        while (true) {
    +          Pair<Integer, WritableRequest> pair = null;
    +          try {
    +            pair = toBeSent.take();
    +          } catch (InterruptedException e) {
    +            throw new IllegalStateException("run: failed while waiting to 
" +
    +                "take an element from the request queue!", e);
    +          }
    +          int taskId = pair.getLeft();
    +          WritableRequest request = pair.getRight();
    +          nettyClient.doSend(taskId, request);
    +          if (aggregateUnsentRequests.decrementAndGet() == 0) {
    +            synchronized (aggregateUnsentRequests) {
    +              aggregateUnsentRequests.notifyAll();
    +            }
    +          }
    +        }
    +      }
    +    });
    +    
cachedRequestHandlerThread.setUncaughtExceptionHandler(exceptionHandler);
    +    cachedRequestHandlerThread.setName("cached-req-sender");
    +    cachedRequestHandlerThread.setDaemon(true);
    +    cachedRequestHandlerThread.start();
    --- End diff --
    
    You can create a utility method like ThreadUtils.startThread with exception 
handler.


> Remove channel probing from Netty worker thread for credit-based flow-control
> -----------------------------------------------------------------------------
>
>                 Key: GIRAPH-1137
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-1137
>             Project: Giraph
>          Issue Type: Bug
>            Reporter: Hassan Eslami
>            Assignee: Hassan Eslami
>
> In credit-based flow-control, sometimes, client threads (one type of Netty 
> worker threads used in Giraph) try to send requests to other workers. This is 
> bad practice for Netty and can cause Netty to mark the execution as 
> deadlock-prone (an example exception shown below). Client threads should only 
> be responsible for sending ACK/NACK messages in response to requests, and 
> they should do so by reuseing the channel from which they received the 
> request. In the current implementation, client threads may try to send 
> unsent/cached requests in credit-based flow control. Sending such requests 
> should be delegated to other threads.
> WARN 2017-03-08 06:06:22,104 [netty-client-worker-3] ....
> io.netty.util.concurrent.BlockingOperationException: 
> DefaultChannelPromise@2c455378(incomplete)
> at 
> io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:383)
> at 
> io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:157)
> at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:343)
> at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:259)
> at 
> org.apache.giraph.utils.ProgressableUtils$ChannelFutureWaitable.waitFor(ProgressableUtils.java:461)
> at 
> org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:214)
> at 
> org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:180)
> at 
> org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:165)
> at 
> org.apache.giraph.utils.ProgressableUtils.awaitChannelFuture(ProgressableUtils.java:132)
> at 
> org.apache.giraph.comm.netty.NettyClient.getNextChannel(NettyClient.java:715)
> at 
> org.apache.giraph.comm.netty.NettyClient.writeRequestToChannel(NettyClient.java:799)
> at org.apache.giraph.comm.netty.NettyClient.doSend(NettyClient.java:789)
> at 
> org.apache.giraph.comm.flow_control.CreditBasedFlowControl.trySendCachedRequests(CreditBasedFlowControl.java:515)
> at 
> org.apache.giraph.comm.flow_control.CreditBasedFlowControl.messageAckReceived(CreditBasedFlowControl.java:485)
> at 
> org.apache.giraph.comm.netty.NettyClient.messageReceived(NettyClient.java:840)
> at 
> org.apache.giraph.comm.netty.handler.ResponseClientHandler.channelRead(ResponseClientHandler.java:87)
> at 
> io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
> at 
> io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153)
> at 
> io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
> at 
> io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
> at 
> org.apache.giraph.comm.netty.InboundByteCounter.channelRead(InboundByteCounter.java:89)
> at 
> io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
> at 
> io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to