Hi,

if you key is a double[], even if the field is a final double[], it is mutable 
because the array entries can be mutated and maybe that is what happened? You 
can check if the following two points are in sync, hash-wise: 
KeyGroupStreamPartitioner::selectChannels and 
AbstractKeyedStateBackend::setCurrentKey. The first method basically determines 
to which parallel operator a tuple is routed in a keyed stream. The second is 
determining the tuple’s key group for the backend. Both must be in sync w.r.t. 
their result of the key-group that is determined for the tuple. And this 
assignment is done based on the hash of the key. Therefore, the hash of the 
tuple’s key should never change and must be immutable. If you can notice a 
change in hash code, that change is what breaks your code. I am pretty sure 
that Flink 1.1.x might just silently accept a mutation of the key, but actually 
this is arguably incorrect.

Best,
Stefan

> Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <stef...@hausmann-family.de>:
> 
> Thanks for these pointers, Stefan.
> 
> I've started a fresh job and didn't migrate any state from previous 
> execution. Moreover, all the fields of all the events I'm using are declared 
> final.
> 
> I've set a breakpoint to figure out what event is causing the problem, and it 
> turns out that Flink starts processing the incoming events for some time and 
> only when a certain window triggers an exception is thrown. The specific code 
> that causes the exception is as follows:
> 
>> DataStream<IdleDuration> idleDuration = cleanedTrips
>>        .keyBy("license")
>>        .flatMap(new DetermineIdleDuration())
>>        .filter(duration -> duration.avg_idle_duration >= 0 && 
>> duration.avg_idle_duration <= 240)
>>        .keyBy("location")
>>        .timeWindow(Time.minutes(10))
>>        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> input, 
>> Collector<IdleDuration> out) -> {
>>            double[] location = Iterables.get(input, 0).location;
>>            double avgDuration = StreamSupport
>>                    .stream(input.spliterator(), false)
>>                    .mapToDouble(idle -> idle.avg_idle_duration)
>>                    .average()
>>                    .getAsDouble();
>> 
>>            out.collect(new IdleDuration(location, avgDuration, 
>> window.maxTimestamp()));
>>        });
> 
> If the apply statement is removed, there is no exception during runtime.
> 
> The location field that is referenced by the keyBy statement is actually a 
> double[]. May this cause the problems I'm experiencing?
> 
> You can find some more code for additional context in the attached document.
> 
> Thanks for looking into this!
> 
> Steffen
> 
> 
> 
> On 20/02/2017 15:22, Stefan Richter wrote:
>> Hi,
>> 
>> Flink 1.2 is partitioning all keys into key-groups, the atomic units for 
>> rescaling. This partitioning is done by hash partitioning and is also in 
>> sync with the routing of tuples to operator instances (each parallel 
>> instance of a keyed operator is responsible for some range of key groups). 
>> This exception means that Flink detected a tuple in the state backend of a 
>> parallel operator instance that should not be there because, by its key 
>> hash, it belongs to a different key-group. Or phrased differently, this 
>> tuple belongs to a different parallel operator instance. If this is a Flink 
>> bug or user code bug is very hard to tell, the log also does not provide 
>> additional insights. I could see this happen in case that your keys are 
>> mutable and your code makes some changes to the object that change the hash 
>> code. Another question is also: did you migrate your job from Flink 1.1.3 
>> through an old savepoint or did you do a fresh start. Other than that, I can 
>> recommend to check your code for mutating of keys. If this fails 
>> deterministically, you could also try to set a breakpoint for the line of 
>> the exception and take a look if the key that is about to be inserted is 
>> somehow special.
>> 
>> Best,
>> Stefan
>> 
>> 
>>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann 
>>> <stef...@hausmann-family.de>:
>>> 
>>> Hi there,
>>> 
>>> I’m having problems running a job on Flink 1.2.0 that successfully executes 
>>> on Flink 1.1.3. The job is supposed to read events from a Kinesis stream 
>>> and to send outputs to Elasticsearch and it actually initiates successfully 
>>> on a Flink 1.2.0 cluster running on YARN, but as soon as I start to ingest 
>>> events into the Kinesis stream, the job fails (see the attachment for more 
>>> information):
>>> 
>>> java.lang.RuntimeException: Unexpected key group index. This indicates a 
>>> bug.
>>> 
>>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>>> 
>>> at 
>>> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
>>> 
>>> at 
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
>>> 
>>> at 
>>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>>> 
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>>> 
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>>> 
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>> 
>>> at java.lang.Thread.run(Thread.java:745)
>>> 
>>> Any ideas what’s going wrong here? The job executes successfully when it’s 
>>> compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 
>>> cluster. Does this indicate a bug in my code or is this rather a bug in 
>>> Flink? How can I further debug this?
>>> 
>>> Any guidance is highly appreciated.
>>> 
>>> Thanks,
>>> 
>>> Steffen
>>> 
>>> <log>
>> 
> <snipplet.java>

Reply via email to