Can you elaborate a bit more? While idleness is not what we’re seeing now, it 
could perhaps be an issue later on. What about a certain partition going idle 
will result in state buildup?

Thanks,
Vishal
On 25 Jan 2023 at 9:14 PM +0530, Martijn Visser <martijnvis...@apache.org>, 
wrote:
> Hi Vishal,
>
> Could idleness be an issue? I could see that if idleness occurs and the Kafka 
> Source not going in an idle state, that more internal state (to commit Kafka 
> transactions) can build up over time that ultimately causes an out of memory 
> problem. See 
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#idleness
>  for more details on this.
>
> Best regards,
>
> Martijn
>
> > Op ma 23 jan. 2023 om 10:53 schreef Vishal Surana <vis...@moengage.com>:
> > > Could it be that link is unable to commit offsets to Kafka? I know that 
> > > blinks checkpoint mechanism isn’t tied to its ability to commit offset 
> > > but at the same time, we’ve seen that the job can take hours to commit 
> > > offsets while checkpoints go through successfully during that period. But 
> > > with Kafka transactions enabled, the commit of offset is now required to 
> > > happen.
> > >
> > > Thanks,
> > > Vishal
> > > On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana <vis...@moengage.com>, 
> > > wrote:
> > > > My job runs fine when running without kafka transactions. The source 
> > > > and sink are kafka in my job with a couple of RocksDB based stateful 
> > > > operators taking 100GB each.
> > > >
> > > > When I enable kafka transactions, things go well initially and we can 
> > > > see high throughput as well. However, after a few hours, the job seems 
> > > > to get stuck as it's unable to commit the transaction, due to which 
> > > > it's unable to consume any more messages as we've enabled exactly once 
> > > > processing with unaligned checkpoints. The number of hours it takes 
> > > > might vary but it always happens and eventually the job crashes with 
> > > > this exception:
> > > >
> > > > ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught 
> > > > exception in thread 'kafka-producer-network-thread | 
> > > > producer-TRANSACTION_ID_PREFIX-1-17060':
> > > > java.lang.OutOfMemoryError: Direct buffer memory\n\tat 
> > > > java.nio.Bits.reserveMemory(Bits.java: 175)
> > > > at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> > > > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> > > > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> > > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> > > > at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> > > > at 
> > > > org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
> > > >  152)
> > > > at 
> > > > org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:
> > > >  58)
> > > > at 
> > > > org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 
> > > > 41)
> > > > at 
> > > > org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 
> > > > 430)
> > > > at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> > > > at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 
> > > > 637)
> > > > at 
> > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:
> > > >  593)
> > > > at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> > > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> > > > at 
> > > > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 
> > > > 327)
> > > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 
> > > > 242)
> > > > at java.lang.Thread.run(Thread.java: 829)
> > > >
> > > > What seems to be happening all of a sudden? Any suggestions on how to 
> > > > fix it?
> > > >
> > > > --
> > > > Regards,
> > > > Vishal

Reply via email to