Could it be that link is unable to commit offsets to Kafka? I know that blinks 
checkpoint mechanism isn’t tied to its ability to commit offset but at the same 
time, we’ve seen that the job can take hours to commit offsets while 
checkpoints go through successfully during that period. But with Kafka 
transactions enabled, the commit of offset is now required to happen.

Thanks,
Vishal
On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana <vis...@moengage.com>, wrote:
> My job runs fine when running without kafka transactions. The source and sink 
> are kafka in my job with a couple of RocksDB based stateful operators taking 
> 100GB each.
>
> When I enable kafka transactions, things go well initially and we can see 
> high throughput as well. However, after a few hours, the job seems to get 
> stuck as it's unable to commit the transaction, due to which it's unable to 
> consume any more messages as we've enabled exactly once processing with 
> unaligned checkpoints. The number of hours it takes might vary but it always 
> happens and eventually the job crashes with this exception:
>
> ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught exception in 
> thread 'kafka-producer-network-thread | 
> producer-TRANSACTION_ID_PREFIX-1-17060':
> java.lang.OutOfMemoryError: Direct buffer memory\n\tat 
> java.nio.Bits.reserveMemory(Bits.java: 175)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
>  152)
> at 
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: 
> 58)
> at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 41)
> at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 430)
> at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 637)
> at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: 
> 593)
> at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 
> 327)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
> at java.lang.Thread.run(Thread.java: 829)
>
> What seems to be happening all of a sudden? Any suggestions on how to fix it?
>
> --
> Regards,
> Vishal

Reply via email to