gortiz commented on code in PR #17667:
URL: https://github.com/apache/pinot/pull/17667#discussion_r2783842155
##########
pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java:
##########
@@ -92,12 +92,18 @@ protected void channelRead0(ChannelHandlerContext ctx,
ByteBuf msg) {
LOGGER.error("Caught exception while deserializing data table of size:
{} from server: {}", responseSize,
_serverRoutingInstance, e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.DATA_TABLE_DESERIALIZATION_EXCEPTIONS,
1);
+ // Propagate so the pipeline closes the channel and channelInactive runs
(markServerDown), otherwise
+ // the query would hang waiting for a response that will never be
delivered.
+ ctx.fireExceptionCaught(e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("Caught exception while handling response from server: {}",
_serverRoutingInstance, cause);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESPONSE_FETCH_EXCEPTIONS, 1);
+ // Propagate so the pipeline can close the channel and release resources;
otherwise the channel
+ // may be left in an inconsistent state and leak.
+ ctx.fireExceptionCaught(cause);
Review Comment:
In case we run out of memory upstream, this method is called. We need to
ensure that memory is released:
```
ERROR [DataTableHandler] [nioEventLoopGroup-x-y] Caught exception while
handling response from server: ZZZZZZ
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate X byte(s)
of direct memory (used: Y, max: Z)
at
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:880)
at
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:809)
at
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:718)
at
io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:707)
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:224)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:142)
at io.netty.buffer.PoolArena.reallocate(PoolArena.java:317)
at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:123)
at
io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:305)
at
io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:280)
at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1103)
at
io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:105)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:288)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1519)
at
io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1377)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1428)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)
```
Here I decided to close the channel. I'm open to other alternatives, but we
need to do something as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]