Tried to attach tar file but it got blocked.   Resending with files attached 
individually.


Ok, have minimal reproducible example.   Attaching a tar file of the job that 
crashed.

The crash has nothing to do with the number of state variables.  But it does 
seem to be caused by using a type for the state variable that is a class nested 
in the KeyedProcessFunction.

Reduced to a single state variable.  The type of the state variable was a class 
(ExecQueue) defined in class implementing KeyedProcessFunction.  Moving the 
ExecQueue definition to its own file fixed the problem.



The attached example always crashes  the taskManager in 30 seconds to 5 minutes.



MyKeyedProcessFunction.java  and also cut and pasted here:



package crash;



import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import org.apache.flink.api.common.state.ValueStateDescriptor;

import org.apache.flink.api.common.typeinfo.TypeHint;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.api.common.state.ValueState;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;

import 
org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;

import org.apache.flink.util.Collector;



public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Exec, 
Exec> {

    private static final Logger LOG = 
LoggerFactory.getLogger(MyKeyedProcessFunction.class);

    public TypeInformation<ExecQueue> leftTypeInfo;

    public transient ValueState<ExecQueue> leftState;



    public int initQueueSize;

    public long emitFrequencyMs;



    public MyKeyedProcessFunction() {

        initQueueSize = 10;

        emitFrequencyMs = 1;

    }



    @Override

    public void open(Configuration conf) {

        leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){});

        leftState = getRuntimeContext().getState(

                    new ValueStateDescriptor<>("left", leftTypeInfo, null));

    }



    @Override

    public void processElement(Exec leftIn, Context ctx, Collector<Exec> out) {

        try {

            ExecQueue eq = leftState.value();

            if (eq == null) {

                eq = new ExecQueue(10);

                
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

            }

            leftState.update(eq);

        }

        catch (Exception e) {

            LOG.error("Exception in processElement1. Key: " + 
ctx.getCurrentKey() + ". " + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());



        }

    }





    @Override

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Exec> 
out) {

        try {

            ExecQueue eq = leftState.value();

            
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime()
 + emitFrequencyMs);

        }

        catch ( Exception e) {

            LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". 
" + e + ". trace = " );

            for (java.lang.StackTraceElement s:e.getStackTrace())

                LOG.error(s.toString());

        }

    }

    public class ExecQueue {

        public RingBufferExec queue;

        public ExecQueue (){}

        public ExecQueue (int initSize) {

            queue = new RingBufferExec(initSize);

        }



        public class RingBufferExec {

            public Integer size;

            public Integer count;

            public RingBufferExec(){ }

            public RingBufferExec(int sizeIn){

                size = sizeIn;

                count = 0;

            }

        }

    }

}


From: Dawid Wysakowicz <dwysakow...@apache.org<mailto:dwysakow...@apache.org>>
Sent: Thursday, October 8, 2020 6:26 AM
To: Colletta, Edward <edward.colle...@fmr.com<mailto:edward.colle...@fmr.com>>; 
user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: state access causing segmentation fault


Hi,

It should be absolutely fine to use multiple state objects. I am not aware of 
any limits to that. A minimal, reproducible example would definitely be 
helpful. For those kind of exceptions, I'd look into the serializers you use. 
Other than that I cannot think of an obvious reason for that kind of exceptions.

Best,

Dawid
On 08/10/2020 12:12, Colletta, Edward wrote:
Using Flink 1.9.2, Java, FsStateBackend.  Running Session cluster on EC2 
instances.

I have a KeyedProcessFunction that is causing a segmentation fault, crashing 
the flink task manager.  The seems to be caused by using 3 State variables in 
the operator.  The crash happens consistently after some load is processed.
This is the second time I have encountered this.   The first time I had 3 
ValueState variables, this time I had 2 ValueState variables and a MapState 
variable.  Both times the error was alleviated by removing one of the state 
variables.
This time I replaced the 2 valueState variables with a Tuple2 of the types of 
the individual variables.   I can try to put together a minimal example, but I 
was wondering if anyone has encountered this problem.

Are there any documented limits of the number of state variables 1 operator can 
use?

For background the reason I use multiple state variables is the operator is 
processing 2 types of inputs, Left and Right.  When Left is received it is put 
it into a PriorityQueue. When the Right type is received I put that into a ring 
buffer.
I replaced the PriorityQueue with a queue of Ids and MapState to hold the 
elements.  So I have Left stored in a queue ValueState variable and MapState 
variable, and Right is stored in the ring buffer ValueState variable.


Attachment: MyKeyedProcessFunction.java
Description: MyKeyedProcessFunction.java

Attachment: Exec.java
Description: Exec.java

Attachment: StreamingJob.java
Description: StreamingJob.java

Attachment: Beacon.java
Description: Beacon.java

Reply via email to