Can you elaborate a bit more? While idleness is not what we’re seeing now, it could perhaps be an issue later on. What about a certain partition going idle will result in state buildup?
Thanks, Vishal On 25 Jan 2023 at 9:14 PM +0530, Martijn Visser <martijnvis...@apache.org>, wrote: > Hi Vishal, > > Could idleness be an issue? I could see that if idleness occurs and the Kafka > Source not going in an idle state, that more internal state (to commit Kafka > transactions) can build up over time that ultimately causes an out of memory > problem. See > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#idleness > for more details on this. > > Best regards, > > Martijn > > > Op ma 23 jan. 2023 om 10:53 schreef Vishal Surana <vis...@moengage.com>: > > > Could it be that link is unable to commit offsets to Kafka? I know that > > > blinks checkpoint mechanism isn’t tied to its ability to commit offset > > > but at the same time, we’ve seen that the job can take hours to commit > > > offsets while checkpoints go through successfully during that period. But > > > with Kafka transactions enabled, the commit of offset is now required to > > > happen. > > > > > > Thanks, > > > Vishal > > > On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana <vis...@moengage.com>, > > > wrote: > > > > My job runs fine when running without kafka transactions. The source > > > > and sink are kafka in my job with a couple of RocksDB based stateful > > > > operators taking 100GB each. > > > > > > > > When I enable kafka transactions, things go well initially and we can > > > > see high throughput as well. However, after a few hours, the job seems > > > > to get stuck as it's unable to commit the transaction, due to which > > > > it's unable to consume any more messages as we've enabled exactly once > > > > processing with unaligned checkpoints. The number of hours it takes > > > > might vary but it always happens and eventually the job crashes with > > > > this exception: > > > > > > > > ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught > > > > exception in thread 'kafka-producer-network-thread | > > > > producer-TRANSACTION_ID_PREFIX-1-17060': > > > > java.lang.OutOfMemoryError: Direct buffer memory\n\tat > > > > java.nio.Bits.reserveMemory(Bits.java: 175) > > > > at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118) > > > > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317) > > > > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242) > > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 164) > > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 130) > > > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493) > > > > at java.nio.channels.SocketChannel.write(SocketChannel.java: 507) > > > > at > > > > org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java: > > > > 152) > > > > at > > > > org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: > > > > 58) > > > > at > > > > org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: > > > > 41) > > > > at > > > > org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: > > > > 430) > > > > at org.apache.kafka.common.network.Selector.write(Selector.java: 644) > > > > at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: > > > > 637) > > > > at > > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: > > > > 593) > > > > at org.apache.kafka.common.network.Selector.poll(Selector.java: 481) > > > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561) > > > > at > > > > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: > > > > 327) > > > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: > > > > 242) > > > > at java.lang.Thread.run(Thread.java: 829) > > > > > > > > What seems to be happening all of a sudden? Any suggestions on how to > > > > fix it? > > > > > > > > -- > > > > Regards, > > > > Vishal