Thanks Stefan and Stephan for your comments. I changed the type of the field 
and now the job seems to be running again.

And thanks Robert for filing the Jira!

Cheers,
Steffen


Am 21. Februar 2017 18:36:41 MEZ schrieb Robert Metzger <rmetz...@apache.org>:
>I've filed a JIRA for the problem:
>https://issues.apache.org/jira/browse/FLINK-5874
>
>On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> @Steffen
>>
>> Yes, you can currently not use arrays as keys. There is a check
>missing
>> that gives you a proper error message for that.
>>
>> The double[] is hashed on the sender side before sending it. Java's
>hash
>> over an array does not take its contents into account, but the
>array's
>> memory address, which makes it a non-deterministic hash.
>> When the double is re-hashed on the receiver, you get a different
>hash,
>> which is detected as violating the key groups.
>>
>> In fact, your program was probably behaving wrong before, but now you
>get
>> a message for the error...
>>
>>
>>
>> On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> if you key is a double[], even if the field is a final double[], it
>is
>>> mutable because the array entries can be mutated and maybe that is
>what
>>> happened? You can check if the following two points are in sync,
>hash-wise:
>>> KeyGroupStreamPartitioner::selectChannels and
>>> AbstractKeyedStateBackend::setCurrentKey. The first method basically
>>> determines to which parallel operator a tuple is routed in a keyed
>stream.
>>> The second is determining the tuple’s key group for the backend.
>Both must
>>> be in sync w.r.t. their result of the key-group that is determined
>for the
>>> tuple. And this assignment is done based on the hash of the key.
>Therefore,
>>> the hash of the tuple’s key should never change and must be
>immutable. If
>>> you can notice a change in hash code, that change is what breaks
>your code.
>>> I am pretty sure that Flink 1.1.x might just silently accept a
>mutation of
>>> the key, but actually this is arguably incorrect.
>>>
>>> Best,
>>> Stefan
>>>
>>> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann <
>>> stef...@hausmann-family.de>:
>>> >
>>> > Thanks for these pointers, Stefan.
>>> >
>>> > I've started a fresh job and didn't migrate any state from
>previous
>>> execution. Moreover, all the fields of all the events I'm using are
>>> declared final.
>>> >
>>> > I've set a breakpoint to figure out what event is causing the
>problem,
>>> and it turns out that Flink starts processing the incoming events
>for some
>>> time and only when a certain window triggers an exception is thrown.
>The
>>> specific code that causes the exception is as follows:
>>> >
>>> >> DataStream<IdleDuration> idleDuration = cleanedTrips
>>> >>        .keyBy("license")
>>> >>        .flatMap(new DetermineIdleDuration())
>>> >>        .filter(duration -> duration.avg_idle_duration >= 0 &&
>>> duration.avg_idle_duration <= 240)
>>> >>        .keyBy("location")
>>> >>        .timeWindow(Time.minutes(10))
>>> >>        .apply((Tuple tuple, TimeWindow window,
>Iterable<IdleDuration>
>>> input, Collector<IdleDuration> out) -> {
>>> >>            double[] location = Iterables.get(input, 0).location;
>>> >>            double avgDuration = StreamSupport
>>> >>                    .stream(input.spliterator(), false)
>>> >>                    .mapToDouble(idle -> idle.avg_idle_duration)
>>> >>                    .average()
>>> >>                    .getAsDouble();
>>> >>
>>> >>            out.collect(new IdleDuration(location, avgDuration,
>>> window.maxTimestamp()));
>>> >>        });
>>> >
>>> > If the apply statement is removed, there is no exception during
>runtime.
>>> >
>>> > The location field that is referenced by the keyBy statement is
>>> actually a double[]. May this cause the problems I'm experiencing?
>>> >
>>> > You can find some more code for additional context in the attached
>>> document.
>>> >
>>> > Thanks for looking into this!
>>> >
>>> > Steffen
>>> >
>>> >
>>> >
>>> > On 20/02/2017 15:22, Stefan Richter wrote:
>>> >> Hi,
>>> >>
>>> >> Flink 1.2 is partitioning all keys into key-groups, the atomic
>units
>>> for rescaling. This partitioning is done by hash partitioning and is
>also
>>> in sync with the routing of tuples to operator instances (each
>parallel
>>> instance of a keyed operator is responsible for some range of key
>groups).
>>> This exception means that Flink detected a tuple in the state
>backend of a
>>> parallel operator instance that should not be there because, by its
>key
>>> hash, it belongs to a different key-group. Or phrased differently,
>this
>>> tuple belongs to a different parallel operator instance. If this is
>a Flink
>>> bug or user code bug is very hard to tell, the log also does not
>provide
>>> additional insights. I could see this happen in case that your keys
>are
>>> mutable and your code makes some changes to the object that change
>the hash
>>> code. Another question is also: did you migrate your job from Flink
>1.1.3
>>> through an old savepoint or did you do a fresh start. Other than
>that, I
>>> can recommend to check your code for mutating of keys. If this fails
>>> deterministically, you could also try to set a breakpoint for the
>line of
>>> the exception and take a look if the key that is about to be
>inserted is
>>> somehow special.
>>> >>
>>> >> Best,
>>> >> Stefan
>>> >>
>>> >>
>>> >>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann <
>>> stef...@hausmann-family.de>:
>>> >>>
>>> >>> Hi there,
>>> >>>
>>> >>> I’m having problems running a job on Flink 1.2.0 that
>successfully
>>> executes on Flink 1.1.3. The job is supposed to read events from a
>Kinesis
>>> stream and to send outputs to Elasticsearch and it actually
>initiates
>>> successfully on a Flink 1.2.0 cluster running on YARN, but as soon
>as I
>>> start to ingest events into the Kinesis stream, the job fails (see
>the
>>> attachment for more information):
>>> >>>
>>> >>> java.lang.RuntimeException: Unexpected key group index. This
>>> indicates a bug.
>>> >>>
>>> >>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTabl
>>> e.java:57)
>>> >>>
>>> >>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapLi
>>> stState.java:98)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.operators.windowing.Windo
>>> wOperator.processElement(WindowOperator.java:372)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p
>>> rocessInput(StreamInputProcessor.java:185)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.
>>> run(OneInputStreamTask.java:63)
>>> >>>
>>> >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:272)
>>> >>>
>>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>>> >>>
>>> >>> at java.lang.Thread.run(Thread.java:745)
>>> >>>
>>> >>> Any ideas what’s going wrong here? The job executes successfully
>when
>>> it’s compiled against the Flink 1.1.3 artifacts and run on a Flink
>1.1.3
>>> cluster. Does this indicate a bug in my code or is this rather a bug
>in
>>> Flink? How can I further debug this?
>>> >>>
>>> >>> Any guidance is highly appreciated.
>>> >>>
>>> >>> Thanks,
>>> >>>
>>> >>> Steffen
>>> >>>
>>> >>> <log>
>>> >>
>>> > <snipplet.java>
>>>
>>>
>>

Reply via email to