Hello,

With the default memory settings, after about 5000 records in my 
KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory
        at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
        at java.nio.DirectByteBuffer.<init>(Unknown Source) ~[?:?]
        at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
        at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]


I read about this and tried to increase the memory settings as below, which 
took care of that problem … 

jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2300m
taskmanager.memory.network.max: 768m
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.managed.fraction: 0.45
taskmanager.memory.network.min: 192m
taskmanager.memory.task.off-heap.size: 512m


But now I have the below issue at exactly or approximately at the same time 
i.e. about after 5000 records. It doesn’t matter whether I send them in a burst 
or stagger them, strangely after that limit, it always blows up i.e. approx 
near to 4.5 to 5.5 records.

Now I am doing multiple state lookups for the Queryable State. Previously I 
used to do about 50% compared to what I did not and I could ingest millions of 
records. But simply doubling the number of lookups has caused the Queryable 
State to fail.

What memory settings do I have to change to rectify this? Any help will be 
appreciated.

I have also seen the BufferPool error sometimes … 


java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed 
request 67.
 Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No 
KvStateLocation found for KvState instance with name ‘queryable-data'.
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839)
        at 
org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554)
        at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown 
Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
                at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:517)
        at akka.actor.Actor.aroundReceive$(Actor.scala:515)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply via email to