Re: Setting uid hash for non-legacy sinks
{ "emoji": "♥️", "version": 1 }
Re: Setting uid hash for non-legacy sinks
Hey Salva, Good to hear your issue has been resolved one way or another! Thanks for confirming that this operator hash trick is working on V2 as well. G On Wed, Jun 12, 2024 at 5:20 AM Salva Alcántara wrote: > Hey Gabor, > > I didn't finally need to keep compatibility with existing savepoints so I > restarted my jobs from scratch so to say. > > However, I have given it a try locally and by debugging the job graph I > can see the hashes manually set following your recipe (see the > userDefinedOperatorIDs) so it seems to work fine! > > Regards, > > Salva > > On Mon, Jun 10, 2024 at 9:49 AM Gabor Somogyi > wrote: > >> YW, ping me back whether it works because it's a nifty feature. >> >> G >> >> On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara >> wrote: >> >>> Thanks Gabor, I will give it a try! >>> >>> On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi < >>> gabor.g.somo...@gmail.com> wrote: >>> >>>> Now I see the intention and then you must have a V2 sink, right? Maybe >>>> you look for the following: >>>> >>>> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; >>>> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; >>>> final CustomSinkOperatorUidHashes operatorsUidHashes = >>>> CustomSinkOperatorUidHashes.builder() >>>> .setWriterUidHash(writerHash) >>>> .setCommitterUidHash(committerHash) >>>> .build(); >>>> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); >>>> >>>> G >>>> >>>> >>>> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen >>>> wrote: >>>> >>>>> Hi Salva, >>>>> >>>>> The SinkV2 transformation will be translated to multiple operators at >>>>> the physical level. When setting a UID, Flink will automatically generate >>>>> UID for sub-operators by filling the configured UID in a pre-defined >>>>> naming >>>>> template. The naming template is carefully maintained to ensure >>>>> cross-version state compatibility. However, this cannot be easily done >>>>> when >>>>> setting the uidHash, that's why Flink currently does not support setting >>>>> it >>>>> for non-legacy sinks. >>>>> >>>>> A possible solution is to convert uidHash back to the uid and apply >>>>> the same strategy for generating uids to compute the corresponding uidHash >>>>> for each suboperator. Maybe you can further investigate it and fire a JIRA >>>>> issue on it. >>>>> >>>>> Best, >>>>> Zhanghao Chen >>>>> -- >>>>> *From:* Salva Alcántara >>>>> *Sent:* Sunday, June 9, 2024 14:49 >>>>> *To:* Gabor Somogyi >>>>> *Cc:* user >>>>> *Subject:* Re: Setting uid hash for non-legacy sinks >>>>> >>>>> Hi Gabor, >>>>> >>>>> Yeah, I know this, but what if you initially forgot and now you want >>>>> to add the uid "after the fact"? >>>>> >>>>> You need to get the operator/vertex id used by Flink for current >>>>> savepoints and somehow set this id for the sink. >>>>> >>>>> With the uid method you would need to "hack" the existing hash (get a >>>>> string which when hashed produces it). I guess this can be done since >>>>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash" >>>>> precisely for that. From the javadocs: >>>>> >>>>> *Important*: this should be used as a workaround or for >>>>> troubleshooting. The provided hash needs to be unique per transformation >>>>> and job. A use case for this is in migration between Flink versions or >>>>> changing the jobs in a way that changes the automatically generated >>>>> hashes. >>>>> In this case, providing the previous hashes directly through this method >>>>> (e.g. obtained from old logs) can help to reestablish a lost mapping from >>>>> states to their target operator. >>>>> >>>>> >>>>> ...but as I said, it seems this method is not supported for new >>>>> (non-legacy) sinks..
Re: Setting uid hash for non-legacy sinks
YW, ping me back whether it works because it's a nifty feature. G On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara wrote: > Thanks Gabor, I will give it a try! > > On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi > wrote: > >> Now I see the intention and then you must have a V2 sink, right? Maybe >> you look for the following: >> >> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; >> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; >> final CustomSinkOperatorUidHashes operatorsUidHashes = >> CustomSinkOperatorUidHashes.builder() >> .setWriterUidHash(writerHash) >> .setCommitterUidHash(committerHash) >> .build(); >> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); >> >> G >> >> >> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen >> wrote: >> >>> Hi Salva, >>> >>> The SinkV2 transformation will be translated to multiple operators at >>> the physical level. When setting a UID, Flink will automatically generate >>> UID for sub-operators by filling the configured UID in a pre-defined naming >>> template. The naming template is carefully maintained to ensure >>> cross-version state compatibility. However, this cannot be easily done when >>> setting the uidHash, that's why Flink currently does not support setting it >>> for non-legacy sinks. >>> >>> A possible solution is to convert uidHash back to the uid and apply the >>> same strategy for generating uids to compute the corresponding uidHash for >>> each suboperator. Maybe you can further investigate it and fire a JIRA >>> issue on it. >>> >>> Best, >>> Zhanghao Chen >>> -- >>> *From:* Salva Alcántara >>> *Sent:* Sunday, June 9, 2024 14:49 >>> *To:* Gabor Somogyi >>> *Cc:* user >>> *Subject:* Re: Setting uid hash for non-legacy sinks >>> >>> Hi Gabor, >>> >>> Yeah, I know this, but what if you initially forgot and now you want to >>> add the uid "after the fact"? >>> >>> You need to get the operator/vertex id used by Flink for current >>> savepoints and somehow set this id for the sink. >>> >>> With the uid method you would need to "hack" the existing hash (get a >>> string which when hashed produces it). I guess this can be done since >>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash" >>> precisely for that. From the javadocs: >>> >>> *Important*: this should be used as a workaround or for >>> troubleshooting. The provided hash needs to be unique per transformation >>> and job. A use case for this is in migration between Flink versions or >>> changing the jobs in a way that changes the automatically generated hashes. >>> In this case, providing the previous hashes directly through this method >>> (e.g. obtained from old logs) can help to reestablish a lost mapping from >>> states to their target operator. >>> >>> >>> ...but as I said, it seems this method is not supported for new >>> (non-legacy) sinks... >>> >>> Regards, >>> >>> Salva >>> >>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi >>> wrote: >>> >>> Hi Salva, >>> >>> Just wondering why not good to set the uid like this? >>> ``` >>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); >>> ``` >>> >>> From the mentioned UID Flink is going to make the hash which is >>> consistent from UID -> HASH transformation perspective. >>> >>> BR, >>> G >>> >>> >>> >>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara >>> wrote: >>> >>> Hi, >>> >>> I want to add the uid for my Kafka sink in such a way that I can still >>> use the existing savepoint. The problem I'm having is that I cannot set the >>> uid hash. If I try something like this: >>> >>> ``` >>> output.sinkTo(outputSink).setUidHash("xyzb"); >>> ``` >>> >>> I get the following error: >>> >>> ``` >>> Exception in thread "main" java.lang.UnsupportedOperationException: >>> Cannot set a custom UID hash on a non-legacy sink >>> at >>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) >>> ``` >>> >>> How can one set the operator id directly then for new (non-legacy) sinks? >>> >>> Kind regards, >>> >>> Salva >>> >>>
Re: Setting uid hash for non-legacy sinks
Thanks Gabor, I will give it a try! On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi wrote: > Now I see the intention and then you must have a V2 sink, right? Maybe you > look for the following: > > final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; > final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; > final CustomSinkOperatorUidHashes operatorsUidHashes = > CustomSinkOperatorUidHashes.builder() > .setWriterUidHash(writerHash) > .setCommitterUidHash(committerHash) > .build(); > src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); > > G > > > On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen > wrote: > >> Hi Salva, >> >> The SinkV2 transformation will be translated to multiple operators at the >> physical level. When setting a UID, Flink will automatically generate UID >> for sub-operators by filling the configured UID in a pre-defined naming >> template. The naming template is carefully maintained to ensure >> cross-version state compatibility. However, this cannot be easily done when >> setting the uidHash, that's why Flink currently does not support setting it >> for non-legacy sinks. >> >> A possible solution is to convert uidHash back to the uid and apply the >> same strategy for generating uids to compute the corresponding uidHash for >> each suboperator. Maybe you can further investigate it and fire a JIRA >> issue on it. >> >> Best, >> Zhanghao Chen >> -- >> *From:* Salva Alcántara >> *Sent:* Sunday, June 9, 2024 14:49 >> *To:* Gabor Somogyi >> *Cc:* user >> *Subject:* Re: Setting uid hash for non-legacy sinks >> >> Hi Gabor, >> >> Yeah, I know this, but what if you initially forgot and now you want to >> add the uid "after the fact"? >> >> You need to get the operator/vertex id used by Flink for current >> savepoints and somehow set this id for the sink. >> >> With the uid method you would need to "hack" the existing hash (get a >> string which when hashed produces it). I guess this can be done since >> murmur3 is a non-cryptographic hash but Flink has the "setUidHash" >> precisely for that. From the javadocs: >> >> *Important*: this should be used as a workaround or for troubleshooting. >> The provided hash needs to be unique per transformation and job. A use case >> for this is in migration between Flink versions or changing the jobs in a >> way that changes the automatically generated hashes. In this case, >> providing the previous hashes directly through this method (e.g. obtained >> from old logs) can help to reestablish a lost mapping from states to their >> target operator. >> >> >> ...but as I said, it seems this method is not supported for new >> (non-legacy) sinks... >> >> Regards, >> >> Salva >> >> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi >> wrote: >> >> Hi Salva, >> >> Just wondering why not good to set the uid like this? >> ``` >> output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); >> ``` >> >> From the mentioned UID Flink is going to make the hash which is >> consistent from UID -> HASH transformation perspective. >> >> BR, >> G >> >> >> >> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara >> wrote: >> >> Hi, >> >> I want to add the uid for my Kafka sink in such a way that I can still >> use the existing savepoint. The problem I'm having is that I cannot set the >> uid hash. If I try something like this: >> >> ``` >> output.sinkTo(outputSink).setUidHash("xyzb"); >> ``` >> >> I get the following error: >> >> ``` >> Exception in thread "main" java.lang.UnsupportedOperationException: >> Cannot set a custom UID hash on a non-legacy sink >> at >> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) >> ``` >> >> How can one set the operator id directly then for new (non-legacy) sinks? >> >> Kind regards, >> >> Salva >> >>
Re: Setting uid hash for non-legacy sinks
Now I see the intention and then you must have a V2 sink, right? Maybe you look for the following: final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; final CustomSinkOperatorUidHashes operatorsUidHashes = CustomSinkOperatorUidHashes.builder() .setWriterUidHash(writerHash) .setCommitterUidHash(committerHash) .build(); src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); G On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen wrote: > Hi Salva, > > The SinkV2 transformation will be translated to multiple operators at the > physical level. When setting a UID, Flink will automatically generate UID > for sub-operators by filling the configured UID in a pre-defined naming > template. The naming template is carefully maintained to ensure > cross-version state compatibility. However, this cannot be easily done when > setting the uidHash, that's why Flink currently does not support setting it > for non-legacy sinks. > > A possible solution is to convert uidHash back to the uid and apply the > same strategy for generating uids to compute the corresponding uidHash for > each suboperator. Maybe you can further investigate it and fire a JIRA > issue on it. > > Best, > Zhanghao Chen > -- > *From:* Salva Alcántara > *Sent:* Sunday, June 9, 2024 14:49 > *To:* Gabor Somogyi > *Cc:* user > *Subject:* Re: Setting uid hash for non-legacy sinks > > Hi Gabor, > > Yeah, I know this, but what if you initially forgot and now you want to > add the uid "after the fact"? > > You need to get the operator/vertex id used by Flink for current > savepoints and somehow set this id for the sink. > > With the uid method you would need to "hack" the existing hash (get a > string which when hashed produces it). I guess this can be done since > murmur3 is a non-cryptographic hash but Flink has the "setUidHash" > precisely for that. From the javadocs: > > *Important*: this should be used as a workaround or for troubleshooting. > The provided hash needs to be unique per transformation and job. A use case > for this is in migration between Flink versions or changing the jobs in a > way that changes the automatically generated hashes. In this case, > providing the previous hashes directly through this method (e.g. obtained > from old logs) can help to reestablish a lost mapping from states to their > target operator. > > > ...but as I said, it seems this method is not supported for new > (non-legacy) sinks... > > Regards, > > Salva > > On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi > wrote: > > Hi Salva, > > Just wondering why not good to set the uid like this? > ``` > output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); > ``` > > From the mentioned UID Flink is going to make the hash which is consistent > from UID -> HASH transformation perspective. > > BR, > G > > > > On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara > wrote: > > Hi, > > I want to add the uid for my Kafka sink in such a way that I can still use > the existing savepoint. The problem I'm having is that I cannot set the uid > hash. If I try something like this: > > ``` > output.sinkTo(outputSink).setUidHash("xyzb"); > ``` > > I get the following error: > > ``` > Exception in thread "main" java.lang.UnsupportedOperationException: Cannot > set a custom UID hash on a non-legacy sink > at > org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) > ``` > > How can one set the operator id directly then for new (non-legacy) sinks? > > Kind regards, > > Salva > >
Re: Setting uid hash for non-legacy sinks
Hi Salva, The SinkV2 transformation will be translated to multiple operators at the physical level. When setting a UID, Flink will automatically generate UID for sub-operators by filling the configured UID in a pre-defined naming template. The naming template is carefully maintained to ensure cross-version state compatibility. However, this cannot be easily done when setting the uidHash, that's why Flink currently does not support setting it for non-legacy sinks. A possible solution is to convert uidHash back to the uid and apply the same strategy for generating uids to compute the corresponding uidHash for each suboperator. Maybe you can further investigate it and fire a JIRA issue on it. Best, Zhanghao Chen From: Salva Alcántara Sent: Sunday, June 9, 2024 14:49 To: Gabor Somogyi Cc: user Subject: Re: Setting uid hash for non-legacy sinks Hi Gabor, Yeah, I know this, but what if you initially forgot and now you want to add the uid "after the fact"? You need to get the operator/vertex id used by Flink for current savepoints and somehow set this id for the sink. With the uid method you would need to "hack" the existing hash (get a string which when hashed produces it). I guess this can be done since murmur3 is a non-cryptographic hash but Flink has the "setUidHash" precisely for that. From the javadocs: Important: this should be used as a workaround or for troubleshooting. The provided hash needs to be unique per transformation and job. A use case for this is in migration between Flink versions or changing the jobs in a way that changes the automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g. obtained from old logs) can help to reestablish a lost mapping from states to their target operator. ...but as I said, it seems this method is not supported for new (non-legacy) sinks... Regards, Salva On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi mailto:gabor.g.somo...@gmail.com>> wrote: Hi Salva, Just wondering why not good to set the uid like this? ``` output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); ``` From the mentioned UID Flink is going to make the hash which is consistent from UID -> HASH transformation perspective. BR, G On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara mailto:salcantara...@gmail.com>> wrote: Hi, I want to add the uid for my Kafka sink in such a way that I can still use the existing savepoint. The problem I'm having is that I cannot set the uid hash. If I try something like this: ``` output.sinkTo(outputSink).setUidHash("xyzb"); ``` I get the following error: ``` Exception in thread "main" java.lang.UnsupportedOperationException: Cannot set a custom UID hash on a non-legacy sink at org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) ``` How can one set the operator id directly then for new (non-legacy) sinks? Kind regards, Salva
Re: Setting uid hash for non-legacy sinks
Hi Gabor, Yeah, I know this, but what if you initially forgot and now you want to add the uid "after the fact"? You need to get the operator/vertex id used by Flink for current savepoints and somehow set this id for the sink. With the uid method you would need to "hack" the existing hash (get a string which when hashed produces it). I guess this can be done since murmur3 is a non-cryptographic hash but Flink has the "setUidHash" precisely for that. From the javadocs: *Important*: this should be used as a workaround or for troubleshooting. > The provided hash needs to be unique per transformation and job. A use case > for this is in migration between Flink versions or changing the jobs in a > way that changes the automatically generated hashes. In this case, > providing the previous hashes directly through this method (e.g. obtained > from old logs) can help to reestablish a lost mapping from states to their > target operator. ...but as I said, it seems this method is not supported for new (non-legacy) sinks... Regards, Salva On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi wrote: > Hi Salva, > > Just wondering why not good to set the uid like this? > ``` > output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); > ``` > > From the mentioned UID Flink is going to make the hash which is consistent > from UID -> HASH transformation perspective. > > BR, > G > > > > On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara > wrote: > >> Hi, >> >> I want to add the uid for my Kafka sink in such a way that I can still >> use the existing savepoint. The problem I'm having is that I cannot set the >> uid hash. If I try something like this: >> >> ``` >> output.sinkTo(outputSink).setUidHash("xyzb"); >> ``` >> >> I get the following error: >> >> ``` >> Exception in thread "main" java.lang.UnsupportedOperationException: >> Cannot set a custom UID hash on a non-legacy sink >> at >> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) >> ``` >> >> How can one set the operator id directly then for new (non-legacy) sinks? >> >> Kind regards, >> >> Salva >> >>
Re: Setting uid hash for non-legacy sinks
Hi Salva, Just wondering why not good to set the uid like this? ``` output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); ``` >From the mentioned UID Flink is going to make the hash which is consistent from UID -> HASH transformation perspective. BR, G On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara wrote: > Hi, > > I want to add the uid for my Kafka sink in such a way that I can still use > the existing savepoint. The problem I'm having is that I cannot set the uid > hash. If I try something like this: > > ``` > output.sinkTo(outputSink).setUidHash("xyzb"); > ``` > > I get the following error: > > ``` > Exception in thread "main" java.lang.UnsupportedOperationException: Cannot > set a custom UID hash on a non-legacy sink > at > org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) > ``` > > How can one set the operator id directly then for new (non-legacy) sinks? > > Kind regards, > > Salva > >