[ https://issues.apache.org/jira/browse/DRILL-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15229728#comment-15229728 ]
ASF GitHub Bot commented on DRILL-3714: --------------------------------------- Github user sudheeshkatkam commented on a diff in the pull request: https://github.com/apache/drill/pull/463#discussion_r58821796 --- Diff: exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java --- @@ -20,51 +20,82 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; +import com.carrotsearch.hppc.IntObjectHashMap; +import com.carrotsearch.hppc.procedures.IntObjectProcedure; +import com.google.common.base.Preconditions; + /** - * Manages the creation of rpc futures for a particular socket. + * Manages the creation of rpc futures for a particular socket <--> socket + * connection. Generally speaking, there will be two threads working with this + * class (the socket thread and the Request generating thread). Synchronization + * is simple with the map being the only thing that is protected. Everything + * else works via Atomic variables. */ -public class CoordinationQueue { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class); +class RequestIdMap { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestIdMap.class); + + private final AtomicInteger value = new AtomicInteger(); + private final AtomicBoolean acceptMessage = new AtomicBoolean(true); - private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger(); - private final Map<Integer, RpcOutcome<?>> map; + /** Access to map must be protected. **/ + private final IntObjectHashMap<RpcOutcome<?>> map; - public CoordinationQueue(int segmentSize, int segmentCount) { - map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f, segmentCount); + public RequestIdMap() { + map = new IntObjectHashMap<RpcOutcome<?>>(); } void channelClosed(Throwable ex) { + acceptMessage.set(false); if (ex != null) { - RpcException e; - if (ex instanceof RpcException) { - e = (RpcException) ex; - } else { - e = new RpcException(ex); + final RpcException e = RpcException.mapException(ex); + synchronized (map) { + map.forEach(new Closer(e)); + map.clear(); } - for (RpcOutcome<?> f : map.values()) { - f.setException(e); + } + } + + private class Closer implements IntObjectProcedure<RpcOutcome<?>> { + final RpcException exception; + + public Closer(RpcException exception) { + this.exception = exception; + } + + @Override + public void apply(int key, RpcOutcome<?> value) { + try{ + value.setException(exception); + }catch(Exception e){ + logger.warn("Failure while attempting to fail rpc response.", e); } } + } - public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection) { - int i = circularInt.getNext(); + public <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V> handler, Class<V> clazz, + RemoteConnection connection) { + int i = value.incrementAndGet(); RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection); - Object old = map.put(i, future); - if (old != null) { - throw new IllegalStateException( - "You attempted to reuse a coordination id when the previous coordination id has not been removed. This is likely rpc future callback memory leak."); + final Object old; + synchronized (map) { + Preconditions.checkArgument(acceptMessage.get(), --- End diff -- Make this check first statement in the method? > Query runs out of memory and remains in CANCELLATION_REQUESTED state until > drillbit is restarted > ------------------------------------------------------------------------------------------------ > > Key: DRILL-3714 > URL: https://issues.apache.org/jira/browse/DRILL-3714 > Project: Apache Drill > Issue Type: Bug > Components: Execution - Flow > Affects Versions: 1.2.0 > Reporter: Victoria Markman > Assignee: Jacques Nadeau > Priority: Critical > Fix For: 1.7.0 > > Attachments: Screen Shot 2015-08-26 at 10.36.33 AM.png, drillbit.log, > jstack.txt, query_profile_2a2210a7-7a78-c774-d54c-c863d0b77bb0.json > > > This is a variation of DRILL-3705 with the difference of drill behavior when > hitting OOM condition. > Query runs out of memory during execution and remains in > "CANCELLATION_REQUESTED" state until drillbit is bounced. > Client (sqlline in this case) never gets a response from the server. > Reproduction details: > Single node drillbit installation. > DRILL_MAX_DIRECT_MEMORY="8G" > DRILL_HEAP="4G" > Run this query on TPCDS SF100 data set > {code} > SELECT SUM(ss.ss_net_paid_inc_tax) OVER (PARTITION BY ss.ss_store_sk) AS > TotalSpend FROM store_sales ss WHERE ss.ss_store_sk IS NOT NULL ORDER BY 1 > LIMIT 10; > {code} > drillbit.log > {code} > 2015-08-26 16:54:58,469 [2a2210a7-7a78-c774-d54c-c863d0b77bb0:frag:3:22] INFO > o.a.d.e.w.f.FragmentStatusReporter - > 2a2210a7-7a78-c774-d54c-c863d0b77bb0:3:22: State to report: RUNNING > 2015-08-26 16:55:50,498 [BitServer-5] WARN > o.a.drill.exec.rpc.data.DataServer - Message of mode REQUEST of rpc type 3 > took longer than 500ms. Actual duration was 2569ms. > 2015-08-26 16:56:31,086 [BitServer-5] ERROR > o.a.d.exec.rpc.RpcExceptionHandler - Exception in RPC communication. > Connection: /10.10.88.133:31012 <--> /10.10.88.133:54554 (data server). > Closing connection. > io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct > buffer memory > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:233) > ~[netty-codec-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618) > [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na] > at > io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329) > [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na] > at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250) > [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na] > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > [netty-common-4.0.27.Final.jar:4.0.27.Final] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > Caused by: java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_71] > at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) > ~[na:1.7.0_71] > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) > ~[na:1.7.0_71] > at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:437) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:179) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at io.netty.buffer.PoolArena.allocate(PoolArena.java:168) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at io.netty.buffer.PoolArena.reallocate(PoolArena.java:280) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:110) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:600) > ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.buffer.UnsafeDirectLittleEndian.writeBytes(UnsafeDirectLittleEndian.java:28) > ~[drill-java-exec-1.2.0-SNAPSHOT.jar:4.0.27.Final] > at > io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) > ~[netty-codec-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:227) > ~[netty-codec-4.0.27.Final.jar:4.0.27.Final] > ... 11 common frames omitted > 2015-08-26 16:56:31,087 [BitServer-5] INFO > o.a.d.exec.rpc.ProtobufLengthDecoder - Channel is closed, discarding > remaining 124958 byte(s) in buffer. > 2015-08-26 16:56:31,087 [BitClient-1] ERROR > o.a.d.exec.rpc.RpcExceptionHandler - Exception in RPC communication. > Connection: /10.10.88.133:54554 <--> /10.10.88.133:31012 (data client). > Closing connection. > java.io.IOException: syscall:read(...)() failed: Connection reset by peer > 2015-08-26 16:56:31,088 [BitClient-1] INFO > o.a.drill.exec.rpc.data.DataClient - Channel closed /10.10.88.133:54554 <--> > /10.10.88.133:31012. > 2015-08-26 16:56:35,325 [BitServer-6] ERROR > o.a.d.exec.rpc.RpcExceptionHandler - Exception in RPC communication. > Connection: /10.10.88.133:31012 <--> /10.10.88.133:54555 (data server). > Closing connection. > io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct > buffer memory > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:233) > ~[netty-codec-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847) > [netty-transport-4.0.27.Final.jar:4.0.27.Final] > at > io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618) > [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na] > at > io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329) > [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na] > at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250) > [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na] > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > [netty-common-4.0.27.Final.jar:4.0.27.Final] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71] > {code} > Attached: > query_profile_2a2210a7-7a78-c774-d54c-c863d0b77bb0.json > drillbit.log > jstack.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)