Hello, With the default memory settings, after about 5000 records in my KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:
Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?] at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?] at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295) ~[flink-dist_2.12-1.12.1.jar:1.12.1] at org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241) ~[flink-dist_2.12-1.12.1.jar:1.12.1] I read about this and tried to increase the memory settings as below, which took care of that problem … jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 2300m taskmanager.memory.network.max: 768m taskmanager.memory.network.fraction: 0.1 taskmanager.memory.managed.fraction: 0.45 taskmanager.memory.network.min: 192m taskmanager.memory.task.off-heap.size: 512m But now I have the below issue at exactly or approximately at the same time i.e. about after 5000 records. It doesn’t matter whether I send them in a burst or stagger them, strangely after that limit, it always blows up i.e. approx near to 4.5 to 5.5 records. Now I am doing multiple state lookups for the Queryable State. Previously I used to do about 50% compared to what I did not and I could ingest millions of records. But simply doubling the number of lookups has caused the Queryable State to fail. What memory settings do I have to change to rectify this? Any help will be appreciated. I have also seen the BufferPool error sometimes … java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed request 67. Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No KvStateLocation found for KvState instance with name ‘queryable-data'. at org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839) at org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554) at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)