Yes, my code should be doing that.  We mostly keyBy a userId.  I'm guessing
there is a subtle bug that's causing issues.  My plan is to log more
details about the input causing the key issue and looking at the raw inputs
that impact that element's key.  I was curious if people have found other
tools that help.

On Thu, Oct 7, 2021 at 11:20 PM Schwalbe Matthias <
[email protected]> wrote:

> Good morning Dan,
>
>
>
> Being short of information on how you arranged your job, I can only make
> general comments:
>
>
>
> ReinterpretAsKeyedStream *only* applies to data streams that are in fact
> partitioned by the same key, i.e. your job would look somewhat like this:
>
>
>
> DataStreamUtils.reinterpretAsKeyedStream(
>
> Stream
>
> .keyBy(keyExtractor1)
>
> .process(keyedProcessFunction1)//or any of the other keyed operators
>
> ,keyExtractor2 …
>
> )
>
> .process(keyedProcessFunction2) //or any of the other keyed operators
>
>
>
> keyExtractor1 and keyExtractor2 need to come to the same result for
> related events (input/output of keyedProcessFuntion1 resp.)
>
>
>
> I assume your exception happens in keyedProcessFunction2?
>
>
>
> reinterpretAsKeyedStream makes sense if you want to chain
> keyedProcessFunction1 and keyedProcessFunction2, otherwise keyBy() will do …
>
>
>
> I hope these hints help, otherwise feel free to get back to the mailing
> list with a more detailed description of your arrangement 😊
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
>
>
> *From:* Dan Hill <[email protected]>
> *Sent:* Freitag, 8. Oktober 2021 06:49
> *To:* user <[email protected]>
> *Subject:* Helper methods for catching unexpected key changes?
>
>
>
> Hi.  I'm getting the following errors when using
> reinterpretAsKeyedStream.  I don't expect the key to change for rows in
> reinterpretAsKeyedStream.  Are there any utilities that I can use that I
> can use with reinterpetAsKeyedStream to verify that the key doesn't
> change?  E.g. some wrapper operator?
>
>
>
>
>
>
> 2021-10-02 16:38:46
> java.lang.IllegalArgumentException: key group from 154 to 156 does not
> contain 213
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
> at
> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to