Thanks for these pointers, Stefan.

I've started a fresh job and didn't migrate any state from previous execution. Moreover, all the fields of all the events I'm using are declared final.

I've set a breakpoint to figure out what event is causing the problem, and it turns out that Flink starts processing the incoming events for some time and only when a certain window triggers an exception is thrown. The specific code that causes the exception is as follows:

DataStream<IdleDuration> idleDuration = cleanedTrips
        .keyBy("license")
        .flatMap(new DetermineIdleDuration())
        .filter(duration -> duration.avg_idle_duration >= 0 && 
duration.avg_idle_duration <= 240)
        .keyBy("location")
        .timeWindow(Time.minutes(10))
        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input, 
Collector<IdleDuration> out) -> {
            double[] location = Iterables.get(input, 0).location;
            double avgDuration = StreamSupport
                    .stream(input.spliterator(), false)
                    .mapToDouble(idle -> idle.avg_idle_duration)
                    .average()
                    .getAsDouble();

            out.collect(new IdleDuration(location, avgDuration, 
window.maxTimestamp()));
        });

If the apply statement is removed, there is no exception during runtime.

The location field that is referenced by the keyBy statement is actually a double[]. May this cause the problems I'm experiencing?

You can find some more code for additional context in the attached document.

Thanks for looking into this!

Steffen



On 20/02/2017 15:22, Stefan Richter wrote:
Hi,

Flink 1.2 is partitioning all keys into key-groups, the atomic units for 
rescaling. This partitioning is done by hash partitioning and is also in sync 
with the routing of tuples to operator instances (each parallel instance of a 
keyed operator is responsible for some range of key groups). This exception 
means that Flink detected a tuple in the state backend of a parallel operator 
instance that should not be there because, by its key hash, it belongs to a 
different key-group. Or phrased differently, this tuple belongs to a different 
parallel operator instance. If this is a Flink bug or user code bug is very 
hard to tell, the log also does not provide additional insights. I could see 
this happen in case that your keys are mutable and your code makes some changes 
to the object that change the hash code. Another question is also: did you 
migrate your job from Flink 1.1.3 through an old savepoint or did you do a 
fresh start. Other than that, I can recommend to check your code for mutating 
of keys. If this fails deterministically, you could also try to set a 
breakpoint for the line of the exception and take a look if the key that is 
about to be inserted is somehow special.

Best,
Stefan


Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <stef...@hausmann-family.de>:

Hi there,

I’m having problems running a job on Flink 1.2.0 that successfully executes on 
Flink 1.1.3. The job is supposed to read events from a Kinesis stream and to 
send outputs to Elasticsearch and it actually initiates successfully on a Flink 
1.2.0 cluster running on YARN, but as soon as I start to ingest events into the 
Kinesis stream, the job fails (see the attachment for more information):

java.lang.RuntimeException: Unexpected key group index. This indicates a bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)

at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)

at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)

Any ideas what’s going wrong here? The job executes successfully when it’s 
compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 cluster. 
Does this indicate a bug in my code or is this rather a bug in Flink? How can I 
further debug this?

Any guidance is highly appreciated.

Thanks,

Steffen

<log>

DataStream<IdleDuration> idleDuration = cleanedTrips
        .keyBy("license")
        .flatMap(new DetermineIdleDuration())
        .filter(duration -> duration.avg_idle_duration >= 0 && 
duration.avg_idle_duration <= 240)
        .keyBy("location")
        .timeWindow(Time.minutes(10))
        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input, 
Collector<IdleDuration> out) -> {
            double[] location = Iterables.get(input, 0).location;
            double avgDuration = StreamSupport
                    .stream(input.spliterator(), false)
                    .mapToDouble(idle -> idle.avg_idle_duration)
                    .average()
                    .getAsDouble();

            out.collect(new IdleDuration(location, avgDuration, 
window.maxTimestamp()));
        });


static class DetermineIdleDuration extends RichFlatMapFunction<TripEvent, 
IdleDuration> {

    private transient ValueState<TripEvent> tripCache;

    private final DecimalFormat df = new DecimalFormat("#.###");

    @Override
    public void flatMap(TripEvent currentTrip, Collector<IdleDuration> out) 
throws Exception {
        TripEvent previousTrip = tripCache.value();

        if (previousTrip != null) {
            double lat = 
Double.parseDouble(df.format(previousTrip.dropoff_lat));
            double lon = 
Double.parseDouble(df.format(previousTrip.dropoff_lon));
            long idle = new Duration(previousTrip.dropoff_time, 
currentTrip.pickup_time).getStandardMinutes();
            long timestamp = currentTrip.pickup_time.getMillis();

            out.collect(new IdleDuration(lat, lon, idle, timestamp));
        }

        tripCache.update(currentTrip);
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<TripEvent> descriptor = new 
ValueStateDescriptor<>("previous trip", TypeInformation.of(new 
TypeHint<TripEvent>() {}), null);

        tripCache = getRuntimeContext().getState(descriptor);
    }
}

public class IdleDuration extends Document {
    public final double[] location;
    public final double avg_idle_duration;

    public IdleDuration() {
        super(-1L);
        location = null;
        avg_idle_duration = 0;
    }

    public IdleDuration(double[] location, double avg_idle_duration, long 
timestamp) {
        super(timestamp);
        this.location = location;
        this.avg_idle_duration = avg_idle_duration;
    }

    public IdleDuration(double lat, double lon, double avg_idle_duration, long 
timestamp) {
        super(timestamp);
        this.avg_idle_duration = avg_idle_duration;
        this.location = new double[] {lon, lat};
    }
}

Reply via email to