Hi Chirag,

If be able to produce the exception, could you first add some logs to print
the value of valueState, valueState.value(), inEvent and 
inEvent.getPriceDelta() ?
I think either object being null would cause NullPointerException here. 

For the second exception, I found a similar issue[1], caused by concurrent 
access to the value state. Do we have the similar situation here ?

Best,
Yun

[1] https://issues.apache.org/jira/browse/FLINK-18587

Best,
Yun



 ------------------Original Mail ------------------
Sender:Chirag Dewan <chirag.dewa...@yahoo.in>
Send Date:Sat Jun 5 20:29:37 2021
Recipients:User <user@flink.apache.org>
Subject:Multiple Exceptions during Load Test in State Access APIs with RocksDB

Hi,

I am getting multiple exceptions while trying to use RocksDB as astate backend. 
I have 2 Task Managers with 2 taskslots and 4 cores each. 
Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2Parallelism) ----> 
KeyedProcessFunction(4 Parallelism) ----> FlinkKafkaProducer(1Parallelism) 
----> KafkaTopic
public class Aggregator_KeyedExpression extendsKeyedProcessFunction<Object, 
GameZoneInput, GameZoneOutput> {
    private ValueState<Integer>valueState;
    @Override
    public void open() throws Exception {
                            ValueStateDescriptor<Integer> descriptor =
           new ValueStateDescriptor<Integer>(
           "totalPrize",Integer.class);
        valueState =getRuntimeContext().getState(descriptor);
    }
    @Override
    public void processElement(GameZoneInputinEvent, Context ctx, final 
List<GameZoneOutput> outEvents)
           throws Exception {
        if(valueState.value() == null) {
           valueState.update(0);
        }
        valueState.update(valueState.value()+ inEvent.getPrizeDelta()); -----> 
NullPointerException on this line
        int sum =valueState.value();
        GameZoneOutputoutput = new GameZoneOutput();
       output.setPlayerId(inEvent.getPlayerId());
       output.setNetPrize(sum);
       outEvents.add(output);
    }
    @Override
    public void close() throws Exception {
       valueState.clear();
    }
}
 While doing a load test, I get a NullPointerException in valueState.value(). 
Which seems strange as we would have updated the value state above.

Another strange thing is that this is observed only in load conditions and 
works fine otherwise.

We also see some serialization exceptions:

Suppressed: java.lang.IllegalArgumentException: Position outof bounds.
atorg.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352)
atorg.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114)
atorg.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113)


Any leads would be appreciated. Thanks

Chirag

Reply via email to