Re: Queryable State Lookup Failure

2021-07-26 Thread Roman Khachatryan
Hello,

Could you check that TMs didn't fail and therefore unregistered KV
states and are still running at the time of the query?
Probably after changing the memory settings there is another error
that is reported later than the state is unregistered.

Regards,
Roman

On Sat, Jul 24, 2021 at 12:50 AM Sandeep khanzode  wrote:
>
> Hello,
>
> With the default memory settings, after about 5000 records in my 
> KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:
>
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
> at java.nio.DirectByteBuffer.(Unknown Source) ~[?:?]
> at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159) 
> ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
> at 
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241)
>  ~[flink-dist_2.12-1.12.1.jar:1.12.1]
>
>
>
> I read about this and tried to increase the memory settings as below, which 
> took care of that problem …
>
> jobmanager.memory.process.size: 1600m
> taskmanager.memory.process.size: 2300m
> taskmanager.memory.network.max: 768m
> taskmanager.memory.network.fraction: 0.1
> taskmanager.memory.managed.fraction: 0.45
> taskmanager.memory.network.min: 192m
> taskmanager.memory.task.off-heap.size: 512m
>
>
>
> But now I have the below issue at exactly or approximately at the same time 
> i.e. about after 5000 records. It doesn’t matter whether I send them in a 
> burst or stagger them, strangely after that limit, it always blows up i.e. 
> approx near to 4.5 to 5.5 records.
>
> Now I am doing multiple state lookups for the Queryable State. Previously I 
> used to do about 50% compared to what I did not and I could ingest millions 
> of records. But simply doubling the number of lookups has caused the 
> Queryable State to fail.
>
> What memory settings do I have to change to rectify this? Any help will be 
> appreciated.
>
> I have also seen the BufferPool error sometimes …
>
>
> java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed 
> request 67.
>  Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No 
> KvStateLocation found for KvState instance with name ‘queryable-data'.
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554)
> at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown Source)
> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
> Source)
> at java.base/java.lan

Queryable State Lookup Failure

2021-07-23 Thread Sandeep khanzode
Hello,

With the default memory settings, after about 5000 records in my 
KafkaFlinkConsumer, and some operators in my pipeline, I get the below error:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Unknown Source) ~[?:?]
at java.nio.DirectByteBuffer.(Unknown Source) ~[?:?]
at java.nio.ByteBuffer.allocateDirect(Unknown Source) ~[?:?]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:755)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:731)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:247)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:215)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:147)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:356)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:139)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.NettyBufferPool.ioBuffer(NettyBufferPool.java:95)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.writePayload(MessageSerializer.java:203)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.messages.MessageSerializer.serializeRequest(MessageSerializer.java:96)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.Client$EstablishedConnection.sendRequest(Client.java:546)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:159) 
~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:336)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:295)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at 
org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:241)
 ~[flink-dist_2.12-1.12.1.jar:1.12.1]


I read about this and tried to increase the memory settings as below, which 
took care of that problem … 

jobmanager.memory.process.size: 1600m
taskmanager.memory.process.size: 2300m
taskmanager.memory.network.max: 768m
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.managed.fraction: 0.45
taskmanager.memory.network.min: 192m
taskmanager.memory.task.off-heap.size: 512m


But now I have the below issue at exactly or approximately at the same time 
i.e. about after 5000 records. It doesn’t matter whether I send them in a burst 
or stagger them, strangely after that limit, it always blows up i.e. approx 
near to 4.5 to 5.5 records.

Now I am doing multiple state lookups for the Queryable State. Previously I 
used to do about 50% compared to what I did not and I could ingest millions of 
records. But simply doubling the number of lookups has caused the Queryable 
State to fail.

What memory settings do I have to change to rectify this? Any help will be 
appreciated.

I have also seen the BufferPool error sometimes … 


java.util.concurrent.ExecutionException: java.lang.RuntimeException: Failed 
request 67.
 Caused by: org.apache.flink.runtime.query.UnknownKvStateLocation: No 
KvStateLocation found for KvState instance with name ‘queryable-data'.
at 
org.apache.flink.runtime.scheduler.SchedulerBase.requestKvStateLocation(SchedulerBase.java:839)
at 
org.apache.flink.runtime.jobmaster.JobMaster.requestKvStateLocation(JobMaster.java:554)
at jdk.internal.reflect.GeneratedMethodAccessor195.invoke(Unknown 
Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessag