[ https://issues.apache.org/jira/browse/GIRAPH-1137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15934816#comment-15934816 ]
ASF GitHub Bot commented on GIRAPH-1137: ---------------------------------------- Github user majakabiljo commented on a diff in the pull request: https://github.com/apache/giraph/pull/26#discussion_r107200544 --- Diff: giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java --- @@ -215,10 +231,38 @@ public void run() { } } }); - thread.setUncaughtExceptionHandler(exceptionHandler); - thread.setName("resume-sender"); - thread.setDaemon(true); - thread.start(); + resumeHandlerThread.setUncaughtExceptionHandler(exceptionHandler); + resumeHandlerThread.setName("resume-sender"); + resumeHandlerThread.setDaemon(true); + resumeHandlerThread.start(); + + // Thread to handle/send cached requests + Thread cachedRequestHandlerThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + Pair<Integer, WritableRequest> pair = null; + try { + pair = toBeSent.take(); + } catch (InterruptedException e) { + throw new IllegalStateException("run: failed while waiting to " + + "take an element from the request queue!", e); + } + int taskId = pair.getLeft(); + WritableRequest request = pair.getRight(); + nettyClient.doSend(taskId, request); + if (aggregateUnsentRequests.decrementAndGet() == 0) { + synchronized (aggregateUnsentRequests) { + aggregateUnsentRequests.notifyAll(); + } + } + } + } + }); + cachedRequestHandlerThread.setUncaughtExceptionHandler(exceptionHandler); + cachedRequestHandlerThread.setName("cached-req-sender"); + cachedRequestHandlerThread.setDaemon(true); + cachedRequestHandlerThread.start(); --- End diff -- You can create a utility method like ThreadUtils.startThread with exception handler. > Remove channel probing from Netty worker thread for credit-based flow-control > ----------------------------------------------------------------------------- > > Key: GIRAPH-1137 > URL: https://issues.apache.org/jira/browse/GIRAPH-1137 > Project: Giraph > Issue Type: Bug > Reporter: Hassan Eslami > Assignee: Hassan Eslami > > In credit-based flow-control, sometimes, client threads (one type of Netty > worker threads used in Giraph) try to send requests to other workers. This is > bad practice for Netty and can cause Netty to mark the execution as > deadlock-prone (an example exception shown below). Client threads should only > be responsible for sending ACK/NACK messages in response to requests, and > they should do so by reuseing the channel from which they received the > request. In the current implementation, client threads may try to send > unsent/cached requests in credit-based flow control. Sending such requests > should be delegated to other threads. > WARN 2017-03-08 06:06:22,104 [netty-client-worker-3] .... > io.netty.util.concurrent.BlockingOperationException: > DefaultChannelPromise@2c455378(incomplete) > at > io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:383) > at > io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:157) > at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:343) > at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:259) > at > org.apache.giraph.utils.ProgressableUtils$ChannelFutureWaitable.waitFor(ProgressableUtils.java:461) > at > org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:214) > at > org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:180) > at > org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:165) > at > org.apache.giraph.utils.ProgressableUtils.awaitChannelFuture(ProgressableUtils.java:132) > at > org.apache.giraph.comm.netty.NettyClient.getNextChannel(NettyClient.java:715) > at > org.apache.giraph.comm.netty.NettyClient.writeRequestToChannel(NettyClient.java:799) > at org.apache.giraph.comm.netty.NettyClient.doSend(NettyClient.java:789) > at > org.apache.giraph.comm.flow_control.CreditBasedFlowControl.trySendCachedRequests(CreditBasedFlowControl.java:515) > at > org.apache.giraph.comm.flow_control.CreditBasedFlowControl.messageAckReceived(CreditBasedFlowControl.java:485) > at > org.apache.giraph.comm.netty.NettyClient.messageReceived(NettyClient.java:840) > at > org.apache.giraph.comm.netty.handler.ResponseClientHandler.channelRead(ResponseClientHandler.java:87) > at > io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338) > at > io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153) > at > io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338) > at > io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324) > at > org.apache.giraph.comm.netty.InboundByteCounter.channelRead(InboundByteCounter.java:89) > at > io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338) > at > io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126) > at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)