Thanks for these pointers, Stefan.
I've started a fresh job and didn't migrate any state from previous
execution. Moreover, all the fields of all the events I'm using are
declared final.
I've set a breakpoint to figure out what event is causing the problem,
and it turns out that Flink starts processing the incoming events for
some time and only when a certain window triggers an exception is
thrown. The specific code that causes the exception is as follows:
DataStream<IdleDuration> idleDuration = cleanedTrips
.keyBy("license")
.flatMap(new DetermineIdleDuration())
.filter(duration -> duration.avg_idle_duration >= 0 &&
duration.avg_idle_duration <= 240)
.keyBy("location")
.timeWindow(Time.minutes(10))
.apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input,
Collector<IdleDuration> out) -> {
double[] location = Iterables.get(input, 0).location;
double avgDuration = StreamSupport
.stream(input.spliterator(), false)
.mapToDouble(idle -> idle.avg_idle_duration)
.average()
.getAsDouble();
out.collect(new IdleDuration(location, avgDuration,
window.maxTimestamp()));
});
If the apply statement is removed, there is no exception during runtime.
The location field that is referenced by the keyBy statement is actually
a double[]. May this cause the problems I'm experiencing?
You can find some more code for additional context in the attached document.
Thanks for looking into this!
Steffen
On 20/02/2017 15:22, Stefan Richter wrote:
Hi,
Flink 1.2 is partitioning all keys into key-groups, the atomic units for
rescaling. This partitioning is done by hash partitioning and is also in sync
with the routing of tuples to operator instances (each parallel instance of a
keyed operator is responsible for some range of key groups). This exception
means that Flink detected a tuple in the state backend of a parallel operator
instance that should not be there because, by its key hash, it belongs to a
different key-group. Or phrased differently, this tuple belongs to a different
parallel operator instance. If this is a Flink bug or user code bug is very
hard to tell, the log also does not provide additional insights. I could see
this happen in case that your keys are mutable and your code makes some changes
to the object that change the hash code. Another question is also: did you
migrate your job from Flink 1.1.3 through an old savepoint or did you do a
fresh start. Other than that, I can recommend to check your code for mutating
of keys. If this fails deterministically, you could also try to set a
breakpoint for the line of the exception and take a look if the key that is
about to be inserted is somehow special.
Best,
Stefan
Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <stef...@hausmann-family.de>:
Hi there,
I’m having problems running a job on Flink 1.2.0 that successfully executes on
Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to
send outputs to Elasticsearch and it actually initiates successfully on a Flink
1.2.0 cluster running on YARN, but as soon as I start to ingest events into the
Kinesis stream, the job fails (see the attachment for more information):
java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
at java.lang.Thread.run(Thread.java:745)
Any ideas what’s going wrong here? The job executes successfully when it’s
compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster.
Does this indicate a bug in my code or is this rather a bug in Flink? How can I
further debug this?
Any guidance is highly appreciated.
Thanks,
Steffen
<log>
DataStream<IdleDuration> idleDuration = cleanedTrips
.keyBy("license")
.flatMap(new DetermineIdleDuration())
.filter(duration -> duration.avg_idle_duration >= 0 &&
duration.avg_idle_duration <= 240)
.keyBy("location")
.timeWindow(Time.minutes(10))
.apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input,
Collector<IdleDuration> out) -> {
double[] location = Iterables.get(input, 0).location;
double avgDuration = StreamSupport
.stream(input.spliterator(), false)
.mapToDouble(idle -> idle.avg_idle_duration)
.average()
.getAsDouble();
out.collect(new IdleDuration(location, avgDuration,
window.maxTimestamp()));
});
static class DetermineIdleDuration extends RichFlatMapFunction<TripEvent,
IdleDuration> {
private transient ValueState<TripEvent> tripCache;
private final DecimalFormat df = new DecimalFormat("#.###");
@Override
public void flatMap(TripEvent currentTrip, Collector<IdleDuration> out)
throws Exception {
TripEvent previousTrip = tripCache.value();
if (previousTrip != null) {
double lat =
Double.parseDouble(df.format(previousTrip.dropoff_lat));
double lon =
Double.parseDouble(df.format(previousTrip.dropoff_lon));
long idle = new Duration(previousTrip.dropoff_time,
currentTrip.pickup_time).getStandardMinutes();
long timestamp = currentTrip.pickup_time.getMillis();
out.collect(new IdleDuration(lat, lon, idle, timestamp));
}
tripCache.update(currentTrip);
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<TripEvent> descriptor = new
ValueStateDescriptor<>("previous trip", TypeInformation.of(new
TypeHint<TripEvent>() {}), null);
tripCache = getRuntimeContext().getState(descriptor);
}
}
public class IdleDuration extends Document {
public final double[] location;
public final double avg_idle_duration;
public IdleDuration() {
super(-1L);
location = null;
avg_idle_duration = 0;
}
public IdleDuration(double[] location, double avg_idle_duration, long
timestamp) {
super(timestamp);
this.location = location;
this.avg_idle_duration = avg_idle_duration;
}
public IdleDuration(double lat, double lon, double avg_idle_duration, long
timestamp) {
super(timestamp);
this.avg_idle_duration = avg_idle_duration;
this.location = new double[] {lon, lat};
}
}