I've filed a JIRA for the problem: https://issues.apache.org/jira/browse/FLINK-5874
On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen <se...@apache.org> wrote: > @Steffen > > Yes, you can currently not use arrays as keys. There is a check missing > that gives you a proper error message for that. > > The double[] is hashed on the sender side before sending it. Java's hash > over an array does not take its contents into account, but the array's > memory address, which makes it a non-deterministic hash. > When the double is re-hashed on the receiver, you get a different hash, > which is detected as violating the key groups. > > In fact, your program was probably behaving wrong before, but now you get > a message for the error... > > > > On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter < > s.rich...@data-artisans.com> wrote: > >> Hi, >> >> if you key is a double[], even if the field is a final double[], it is >> mutable because the array entries can be mutated and maybe that is what >> happened? You can check if the following two points are in sync, hash-wise: >> KeyGroupStreamPartitioner::selectChannels and >> AbstractKeyedStateBackend::setCurrentKey. The first method basically >> determines to which parallel operator a tuple is routed in a keyed stream. >> The second is determining the tuple’s key group for the backend. Both must >> be in sync w.r.t. their result of the key-group that is determined for the >> tuple. And this assignment is done based on the hash of the key. Therefore, >> the hash of the tuple’s key should never change and must be immutable. If >> you can notice a change in hash code, that change is what breaks your code. >> I am pretty sure that Flink 1.1.x might just silently accept a mutation of >> the key, but actually this is arguably incorrect. >> >> Best, >> Stefan >> >> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann < >> stef...@hausmann-family.de>: >> > >> > Thanks for these pointers, Stefan. >> > >> > I've started a fresh job and didn't migrate any state from previous >> execution. Moreover, all the fields of all the events I'm using are >> declared final. >> > >> > I've set a breakpoint to figure out what event is causing the problem, >> and it turns out that Flink starts processing the incoming events for some >> time and only when a certain window triggers an exception is thrown. The >> specific code that causes the exception is as follows: >> > >> >> DataStream<IdleDuration> idleDuration = cleanedTrips >> >> .keyBy("license") >> >> .flatMap(new DetermineIdleDuration()) >> >> .filter(duration -> duration.avg_idle_duration >= 0 && >> duration.avg_idle_duration <= 240) >> >> .keyBy("location") >> >> .timeWindow(Time.minutes(10)) >> >> .apply((Tuple tuple, TimeWindow window, Iterable<IdleDuration> >> input, Collector<IdleDuration> out) -> { >> >> double[] location = Iterables.get(input, 0).location; >> >> double avgDuration = StreamSupport >> >> .stream(input.spliterator(), false) >> >> .mapToDouble(idle -> idle.avg_idle_duration) >> >> .average() >> >> .getAsDouble(); >> >> >> >> out.collect(new IdleDuration(location, avgDuration, >> window.maxTimestamp())); >> >> }); >> > >> > If the apply statement is removed, there is no exception during runtime. >> > >> > The location field that is referenced by the keyBy statement is >> actually a double[]. May this cause the problems I'm experiencing? >> > >> > You can find some more code for additional context in the attached >> document. >> > >> > Thanks for looking into this! >> > >> > Steffen >> > >> > >> > >> > On 20/02/2017 15:22, Stefan Richter wrote: >> >> Hi, >> >> >> >> Flink 1.2 is partitioning all keys into key-groups, the atomic units >> for rescaling. This partitioning is done by hash partitioning and is also >> in sync with the routing of tuples to operator instances (each parallel >> instance of a keyed operator is responsible for some range of key groups). >> This exception means that Flink detected a tuple in the state backend of a >> parallel operator instance that should not be there because, by its key >> hash, it belongs to a different key-group. Or phrased differently, this >> tuple belongs to a different parallel operator instance. If this is a Flink >> bug or user code bug is very hard to tell, the log also does not provide >> additional insights. I could see this happen in case that your keys are >> mutable and your code makes some changes to the object that change the hash >> code. Another question is also: did you migrate your job from Flink 1.1.3 >> through an old savepoint or did you do a fresh start. Other than that, I >> can recommend to check your code for mutating of keys. If this fails >> deterministically, you could also try to set a breakpoint for the line of >> the exception and take a look if the key that is about to be inserted is >> somehow special. >> >> >> >> Best, >> >> Stefan >> >> >> >> >> >>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann < >> stef...@hausmann-family.de>: >> >>> >> >>> Hi there, >> >>> >> >>> I’m having problems running a job on Flink 1.2.0 that successfully >> executes on Flink 1.1.3. The job is supposed to read events from a Kinesis >> stream and to send outputs to Elasticsearch and it actually initiates >> successfully on a Flink 1.2.0 cluster running on YARN, but as soon as I >> start to ingest events into the Kinesis stream, the job fails (see the >> attachment for more information): >> >>> >> >>> java.lang.RuntimeException: Unexpected key group index. This >> indicates a bug. >> >>> >> >>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTabl >> e.java:57) >> >>> >> >>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapLi >> stState.java:98) >> >>> >> >>> at org.apache.flink.streaming.runtime.operators.windowing.Windo >> wOperator.processElement(WindowOperator.java:372) >> >>> >> >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p >> rocessInput(StreamInputProcessor.java:185) >> >>> >> >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask. >> run(OneInputStreamTask.java:63) >> >>> >> >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:272) >> >>> >> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >> >>> >> >>> at java.lang.Thread.run(Thread.java:745) >> >>> >> >>> Any ideas what’s going wrong here? The job executes successfully when >> it’s compiled against the Flink 1.1.3 artifacts and run on a Flink 1.1.3 >> cluster. Does this indicate a bug in my code or is this rather a bug in >> Flink? How can I further debug this? >> >>> >> >>> Any guidance is highly appreciated. >> >>> >> >>> Thanks, >> >>> >> >>> Steffen >> >>> >> >>> <log> >> >> >> > <snipplet.java> >> >> >