Yes, my code should be doing that. We mostly keyBy a userId. I'm guessing there is a subtle bug that's causing issues. My plan is to log more details about the input causing the key issue and looking at the raw inputs that impact that element's key. I was curious if people have found other tools that help.
On Thu, Oct 7, 2021 at 11:20 PM Schwalbe Matthias < [email protected]> wrote: > Good morning Dan, > > > > Being short of information on how you arranged your job, I can only make > general comments: > > > > ReinterpretAsKeyedStream *only* applies to data streams that are in fact > partitioned by the same key, i.e. your job would look somewhat like this: > > > > DataStreamUtils.reinterpretAsKeyedStream( > > Stream > > .keyBy(keyExtractor1) > > .process(keyedProcessFunction1)//or any of the other keyed operators > > ,keyExtractor2 … > > ) > > .process(keyedProcessFunction2) //or any of the other keyed operators > > > > keyExtractor1 and keyExtractor2 need to come to the same result for > related events (input/output of keyedProcessFuntion1 resp.) > > > > I assume your exception happens in keyedProcessFunction2? > > > > reinterpretAsKeyedStream makes sense if you want to chain > keyedProcessFunction1 and keyedProcessFunction2, otherwise keyBy() will do … > > > > I hope these hints help, otherwise feel free to get back to the mailing > list with a more detailed description of your arrangement 😊 > > > > Cheers > > > > Thias > > > > > > > > > > > > *From:* Dan Hill <[email protected]> > *Sent:* Freitag, 8. Oktober 2021 06:49 > *To:* user <[email protected]> > *Subject:* Helper methods for catching unexpected key changes? > > > > Hi. I'm getting the following errors when using > reinterpretAsKeyedStream. I don't expect the key to change for rows in > reinterpretAsKeyedStream. Are there any utilities that I can use that I > can use with reinterpetAsKeyedStream to verify that the key doesn't > change? E.g. some wrapper operator? > > > > > > > 2021-10-02 16:38:46 > java.lang.IllegalArgumentException: key group from 154 to 156 does not > contain 213 > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) > at > org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >
