Hello,
Could you check that TMs didn't fail and therefore unregistered KV
states and are still running at the time of the query?
Probably after changing the memory settings there is another error
that is reported later than the state is unregistered.
Regards,
Roman
On Sat, Jul 24, 2021 at 12:50 AM Sandeep khanzode wrote:
>
> Hello,
>
> With the default memory settings, after about 5000 records in my
> KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
> at java.nio.DirectByteBuffer.(Unknown Source) ~[?:?]
> at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241)
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>
>
>
> I read about this and tried to increase the memory settings as below, which
> took care of that problem …
>
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 2300m
> taskmanager.memory.network.max: 768m
> taskmanager.memory.network.fraction: 0.1
> taskmanager.memory.managed.fraction: 0.45
> taskmanager.memory.network.min: 192m
> taskmanager.memory.task.off-heap.size: 512m
>
>
>
> But now I have the below issue at exactly or approximately at the same time
> i.e. about after 5000 records. It doesn’t matter whether I send them in a
> burst or stagger them, strangely after that limit, it always blows up i.e.
> approx near to 4.5 to 5.5 records.
>
> Now I am doing multiple state lookups for the Queryable State. Previously I
> used to do about 50% compared to what I did not and I could ingest millions
> of records. But simply doubling the number of lookups has caused the
> Queryable State to fail.
>
> What memory settings do I have to change to rectify this? Any help will be
> appreciated.
>
> I have also seen the BufferPool error sometimes …
>
>
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed
> request 67.
> Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No
> KvStateLocation found for KvState instance with name ‘queryable-data'.
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554)
> at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown Source)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lan