My job runs fine when running without kafka transactions. The source and
sink are kafka in my job with a couple of RocksDB based stateful operators
taking 100GB each.

When I enable kafka transactions, things go well initially and we can see
high throughput as well. However, after a few hours, the job seems to get
stuck as it's unable to commit the transaction, due to which it's unable to
consume any more messages as we've enabled exactly once processing with
unaligned checkpoints. The number of hours it takes might vary but it
always happens and eventually the job crashes with this exception:

ERROR org.apache.kafka.common.utils.KafkaThread ::::: - Uncaught exception
in thread 'kafka-producer-network-thread |
producer-TRANSACTION_ID_PREFIX-1-17060':
java.lang.OutOfMemoryError: Direct buffer memory\n\tat
java.nio.Bits.reserveMemory(Bits.java: 175)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
at
org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
152)
at
org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:
58)
at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java:
41)
at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:
430)
at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:
637)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:
593)
at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:
327)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
at java.lang.Thread.run(Thread.java: 829)

What seems to be happening all of a sudden? Any suggestions on how to fix
it?

-- 
Regards,
Vishal

Reply via email to