I've filed a JIRA for the problem:
https://issues.apache.org/jira/browse/FLINK-5874

On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen <se...@apache.org> wrote:

> @Steffen
>
> Yes, you can currently not use arrays as keys. There is a check missing
> that gives you a proper error message for that.
>
> The double[] is hashed on the sender side before sending it. Java's hash
> over an array does not take its contents into account, but the array's
> memory address, which makes it a non-deterministic hash.
> When the double is re-hashed on the receiver, you get a different hash,
> which is detected as violating the key groups.
>
> In fact, your program was probably behaving wrong before, but now you get
> a message for the error...
>
>
>
> On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <
> s.rich...@data-artisans.com> wrote:
>
>> Hi,
>>
>> if you key is a double[], even if the field is a final double[], it is
>> mutable because the array entries can be mutated and maybe that is what
>> happened? You can check if the following two points are in sync, hash-wise:
>> KeyGroupStreamPartitioner::selectChannels and
>> AbstractKeyedStateBackend::setCurrentKey. The first method basically
>> determines to which parallel operator a tuple is routed in a keyed stream.
>> The second is determining the tuple’s key group for the backend. Both must
>> be in sync w.r.t. their result of the key-group that is determined for the
>> tuple. And this assignment is done based on the hash of the key. Therefore,
>> the hash of the tuple’s key should never change and must be immutable. If
>> you can notice a change in hash code, that change is what breaks your code.
>> I am pretty sure that Flink 1.1.x might just silently accept a mutation of
>> the key, but actually this is arguably incorrect.
>>
>> Best,
>> Stefan
>>
>> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <
>> stef...@hausmann-family.de>:
>> >
>> > Thanks for these pointers, Stefan.
>> >
>> > I've started a fresh job and didn't migrate any state from previous
>> execution. Moreover, all the fields of all the events I'm using are
>> declared final.
>> >
>> > I've set a breakpoint to figure out what event is causing the problem,
>> and it turns out that Flink starts processing the incoming events for some
>> time and only when a certain window triggers an exception is thrown. The
>> specific code that causes the exception is as follows:
>> >
>> >> DataStream<IdleDuration> idleDuration = cleanedTrips
>> >>        .keyBy("license")
>> >>        .flatMap(new DetermineIdleDuration())
>> >>        .filter(duration -> duration.avg_idle_duration >= 0 &&
>> duration.avg_idle_duration <= 240)
>> >>        .keyBy("location")
>> >>        .timeWindow(Time.minutes(10))
>> >>        .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration>
>> input, Collector<IdleDuration> out) -> {
>> >>            double[] location = Iterables.get(input, 0).location;
>> >>            double avgDuration = StreamSupport
>> >>                    .stream(input.spliterator(), false)
>> >>                    .mapToDouble(idle -> idle.avg_idle_duration)
>> >>                    .average()
>> >>                    .getAsDouble();
>> >>
>> >>            out.collect(new IdleDuration(location, avgDuration,
>> window.maxTimestamp()));
>> >>        });
>> >
>> > If the apply statement is removed, there is no exception during runtime.
>> >
>> > The location field that is referenced by the keyBy statement is
>> actually a double[]. May this cause the problems I'm experiencing?
>> >
>> > You can find some more code for additional context in the attached
>> document.
>> >
>> > Thanks for looking into this!
>> >
>> > Steffen
>> >
>> >
>> >
>> > On 20/02/2017 15:22, Stefan Richter wrote:
>> >> Hi,
>> >>
>> >> Flink 1.2 is partitioning all keys into key-groups, the atomic units
>> for rescaling. This partitioning is done by hash partitioning and is also
>> in sync with the routing of tuples to operator instances (each parallel
>> instance of a keyed operator is responsible for some range of key groups).
>> This exception means that Flink detected a tuple in the state backend of a
>> parallel operator instance that should not be there because, by its key
>> hash, it belongs to a different key-group. Or phrased differently, this
>> tuple belongs to a different parallel operator instance. If this is a Flink
>> bug or user code bug is very hard to tell, the log also does not provide
>> additional insights. I could see this happen in case that your keys are
>> mutable and your code makes some changes to the object that change the hash
>> code. Another question is also: did you migrate your job from Flink 1.1.3
>> through an old savepoint or did you do a fresh start. Other than that, I
>> can recommend to check your code for mutating of keys. If this fails
>> deterministically, you could also try to set a breakpoint for the line of
>> the exception and take a look if the key that is about to be inserted is
>> somehow special.
>> >>
>> >> Best,
>> >> Stefan
>> >>
>> >>
>> >>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <
>> stef...@hausmann-family.de>:
>> >>>
>> >>> Hi there,
>> >>>
>> >>> I’m having problems running a job on Flink 1.2.0 that successfully
>> executes on Flink 1.1.3. The job is supposed to read events from a Kinesis
>> stream and to send outputs to Elasticsearch and it actually initiates
>> successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I
>> start to ingest events into the Kinesis stream, the job fails (see the
>> attachment for more information):
>> >>>
>> >>> java.lang.RuntimeException: Unexpected key group index. This
>> indicates a bug.
>> >>>
>> >>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTabl
>> e.java:57)
>> >>>
>> >>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapLi
>> stState.java:98)
>> >>>
>> >>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>> wOperator.processElement(WindowOperator.java:372)
>> >>>
>> >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>> rocessInput(StreamInputProcessor.java:185)
>> >>>
>> >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>> run(OneInputStreamTask.java:63)
>> >>>
>> >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:272)
>> >>>
>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>> >>>
>> >>> at java.lang.Thread.run(Thread.java:745)
>> >>>
>> >>> Any ideas what’s going wrong here? The job executes successfully when
>> it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3
>> cluster. Does this indicate a bug in my code or is this rather a bug in
>> Flink? How can I further debug this?
>> >>>
>> >>> Any guidance is highly appreciated.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Steffen
>> >>>
>> >>> <log>
>> >>
>> > <snipplet.java>
>>
>>
>

Reply via email to