Tried to attach tar file but it got blocked. Resending with files attached individually.
Ok, have minimal reproducible example. Attaching a tar file of the job that crashed. The crash has nothing to do with the number of state variables. But it does seem to be caused by using a type for the state variable that is a class nested in the KeyedProcessFunction. Reduced to a single state variable. The type of the state variable was a class (ExecQueue) defined in class implementing KeyedProcessFunction. Moving the ExecQueue definition to its own file fixed the problem. The attached example always crashes the taskManager in 30 seconds to 5 minutes. MyKeyedProcessFunction.java and also cut and pasted here: package crash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context; import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext; import org.apache.flink.util.Collector; public class MyKeyedProcessFunction extends KeyedProcessFunction<String, Exec, Exec> { private static final Logger LOG = LoggerFactory.getLogger(MyKeyedProcessFunction.class); public TypeInformation<ExecQueue> leftTypeInfo; public transient ValueState<ExecQueue> leftState; public int initQueueSize; public long emitFrequencyMs; public MyKeyedProcessFunction() { initQueueSize = 10; emitFrequencyMs = 1; } @Override public void open(Configuration conf) { leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){}); leftState = getRuntimeContext().getState( new ValueStateDescriptor<>("left", leftTypeInfo, null)); } @Override public void processElement(Exec leftIn, Context ctx, Collector<Exec> out) { try { ExecQueue eq = leftState.value(); if (eq == null) { eq = new ExecQueue(10); ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs); } leftState.update(eq); } catch (Exception e) { LOG.error("Exception in processElement1. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " ); for (java.lang.StackTraceElement s:e.getStackTrace()) LOG.error(s.toString()); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Exec> out) { try { ExecQueue eq = leftState.value(); ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + emitFrequencyMs); } catch ( Exception e) { LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() + ". " + e + ". trace = " ); for (java.lang.StackTraceElement s:e.getStackTrace()) LOG.error(s.toString()); } } public class ExecQueue { public RingBufferExec queue; public ExecQueue (){} public ExecQueue (int initSize) { queue = new RingBufferExec(initSize); } public class RingBufferExec { public Integer size; public Integer count; public RingBufferExec(){ } public RingBufferExec(int sizeIn){ size = sizeIn; count = 0; } } } } From: Dawid Wysakowicz <dwysakow...@apache.org<mailto:dwysakow...@apache.org>> Sent: Thursday, October 8, 2020 6:26 AM To: Colletta, Edward <edward.colle...@fmr.com<mailto:edward.colle...@fmr.com>>; user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: state access causing segmentation fault Hi, It should be absolutely fine to use multiple state objects. I am not aware of any limits to that. A minimal, reproducible example would definitely be helpful. For those kind of exceptions, I'd look into the serializers you use. Other than that I cannot think of an obvious reason for that kind of exceptions. Best, Dawid On 08/10/2020 12:12, Colletta, Edward wrote: Using Flink 1.9.2, Java, FsStateBackend. Running Session cluster on EC2 instances. I have a KeyedProcessFunction that is causing a segmentation fault, crashing the flink task manager. The seems to be caused by using 3 State variables in the operator. The crash happens consistently after some load is processed. This is the second time I have encountered this. The first time I had 3 ValueState variables, this time I had 2 ValueState variables and a MapState variable. Both times the error was alleviated by removing one of the state variables. This time I replaced the 2 valueState variables with a Tuple2 of the types of the individual variables. I can try to put together a minimal example, but I was wondering if anyone has encountered this problem. Are there any documented limits of the number of state variables 1 operator can use? For background the reason I use multiple state variables is the operator is processing 2 types of inputs, Left and Right. When Left is received it is put it into a PriorityQueue. When the Right type is received I put that into a ring buffer. I replaced the PriorityQueue with a queue of Ids and MapState to hold the elements. So I have Left stored in a queue ValueState variable and MapState variable, and Right is stored in the ring buffer ValueState variable.
MyKeyedProcessFunction.java
Description: MyKeyedProcessFunction.java
Exec.java
Description: Exec.java
StreamingJob.java
Description: StreamingJob.java
Beacon.java
Description: Beacon.java