Thanks Stefan and Stephan for your comments. I changed the type of the field and now the job seems to be running again.
And thanks Robert for filing the Jira! Cheers, Steffen Am 21. Februar 2017 18:36:41 MEZ schrieb Robert Metzger <rmetz...@apache.org>: >I've filed a JIRA for the problem: >https://issues.apache.org/jira/browse/FLINK-5874 > >On Tue, Feb 21, 2017 at 4:09 PM, Stephan Ewen <se...@apache.org> wrote: > >> @Steffen >> >> Yes, you can currently not use arrays as keys. There is a check >missing >> that gives you a proper error message for that. >> >> The double[] is hashed on the sender side before sending it. Java's >hash >> over an array does not take its contents into account, but the >array's >> memory address, which makes it a non-deterministic hash. >> When the double is re-hashed on the receiver, you get a different >hash, >> which is detected as violating the key groups. >> >> In fact, your program was probably behaving wrong before, but now you >get >> a message for the error... >> >> >> >> On Tue, Feb 21, 2017 at 3:14 PM, Stefan Richter < >> s.rich...@data-artisans.com> wrote: >> >>> Hi, >>> >>> if you key is a double[], even if the field is a final double[], it >is >>> mutable because the array entries can be mutated and maybe that is >what >>> happened? You can check if the following two points are in sync, >hash-wise: >>> KeyGroupStreamPartitioner::selectChannels and >>> AbstractKeyedStateBackend::setCurrentKey. The first method basically >>> determines to which parallel operator a tuple is routed in a keyed >stream. >>> The second is determining the tuple’s key group for the backend. >Both must >>> be in sync w.r.t. their result of the key-group that is determined >for the >>> tuple. And this assignment is done based on the hash of the key. >Therefore, >>> the hash of the tuple’s key should never change and must be >immutable. If >>> you can notice a change in hash code, that change is what breaks >your code. >>> I am pretty sure that Flink 1.1.x might just silently accept a >mutation of >>> the key, but actually this is arguably incorrect. >>> >>> Best, >>> Stefan >>> >>> > Am 21.02.2017 um 14:51 schrieb Steffen Hausmann < >>> stef...@hausmann-family.de>: >>> > >>> > Thanks for these pointers, Stefan. >>> > >>> > I've started a fresh job and didn't migrate any state from >previous >>> execution. Moreover, all the fields of all the events I'm using are >>> declared final. >>> > >>> > I've set a breakpoint to figure out what event is causing the >problem, >>> and it turns out that Flink starts processing the incoming events >for some >>> time and only when a certain window triggers an exception is thrown. >The >>> specific code that causes the exception is as follows: >>> > >>> >> DataStream<IdleDuration> idleDuration = cleanedTrips >>> >> .keyBy("license") >>> >> .flatMap(new DetermineIdleDuration()) >>> >> .filter(duration -> duration.avg_idle_duration >= 0 && >>> duration.avg_idle_duration <= 240) >>> >> .keyBy("location") >>> >> .timeWindow(Time.minutes(10)) >>> >> .apply((Tuple tuple, TimeWindow window, >Iterable<IdleDuration> >>> input, Collector<IdleDuration> out) -> { >>> >> double[] location = Iterables.get(input, 0).location; >>> >> double avgDuration = StreamSupport >>> >> .stream(input.spliterator(), false) >>> >> .mapToDouble(idle -> idle.avg_idle_duration) >>> >> .average() >>> >> .getAsDouble(); >>> >> >>> >> out.collect(new IdleDuration(location, avgDuration, >>> window.maxTimestamp())); >>> >> }); >>> > >>> > If the apply statement is removed, there is no exception during >runtime. >>> > >>> > The location field that is referenced by the keyBy statement is >>> actually a double[]. May this cause the problems I'm experiencing? >>> > >>> > You can find some more code for additional context in the attached >>> document. >>> > >>> > Thanks for looking into this! >>> > >>> > Steffen >>> > >>> > >>> > >>> > On 20/02/2017 15:22, Stefan Richter wrote: >>> >> Hi, >>> >> >>> >> Flink 1.2 is partitioning all keys into key-groups, the atomic >units >>> for rescaling. This partitioning is done by hash partitioning and is >also >>> in sync with the routing of tuples to operator instances (each >parallel >>> instance of a keyed operator is responsible for some range of key >groups). >>> This exception means that Flink detected a tuple in the state >backend of a >>> parallel operator instance that should not be there because, by its >key >>> hash, it belongs to a different key-group. Or phrased differently, >this >>> tuple belongs to a different parallel operator instance. If this is >a Flink >>> bug or user code bug is very hard to tell, the log also does not >provide >>> additional insights. I could see this happen in case that your keys >are >>> mutable and your code makes some changes to the object that change >the hash >>> code. Another question is also: did you migrate your job from Flink >1.1.3 >>> through an old savepoint or did you do a fresh start. Other than >that, I >>> can recommend to check your code for mutating of keys. If this fails >>> deterministically, you could also try to set a breakpoint for the >line of >>> the exception and take a look if the key that is about to be >inserted is >>> somehow special. >>> >> >>> >> Best, >>> >> Stefan >>> >> >>> >> >>> >>> Am 20.02.2017 um 14:32 schrieb Steffen Hausmann < >>> stef...@hausmann-family.de>: >>> >>> >>> >>> Hi there, >>> >>> >>> >>> I’m having problems running a job on Flink 1.2.0 that >successfully >>> executes on Flink 1.1.3. The job is supposed to read events from a >Kinesis >>> stream and to send outputs to Elasticsearch and it actually >initiates >>> successfully on a Flink 1.2.0 cluster running on YARN, but as soon >as I >>> start to ingest events into the Kinesis stream, the job fails (see >the >>> attachment for more information): >>> >>> >>> >>> java.lang.RuntimeException: Unexpected key group index. This >>> indicates a bug. >>> >>> >>> >>> at org.apache.flink.runtime.state.heap.StateTable.set(StateTabl >>> e.java:57) >>> >>> >>> >>> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapLi >>> stState.java:98) >>> >>> >>> >>> at org.apache.flink.streaming.runtime.operators.windowing.Windo >>> wOperator.processElement(WindowOperator.java:372) >>> >>> >>> >>> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.p >>> rocessInput(StreamInputProcessor.java:185) >>> >>> >>> >>> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask. >>> run(OneInputStreamTask.java:63) >>> >>> >>> >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:272) >>> >>> >>> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) >>> >>> >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> >>> >>> Any ideas what’s going wrong here? The job executes successfully >when >>> it’s compiled against the Flink 1.1.3 artifacts and run on a Flink >1.1.3 >>> cluster. Does this indicate a bug in my code or is this rather a bug >in >>> Flink? How can I further debug this? >>> >>> >>> >>> Any guidance is highly appreciated. >>> >>> >>> >>> Thanks, >>> >>> >>> >>> Steffen >>> >>> >>> >>> <log> >>> >> >>> > <snipplet.java> >>> >>> >>