Thanks Gabor, I will give it a try! On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote:
> Now I see the intention and then you must have a V2 sink, right? Maybe you > look for the following: > > final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; > final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; > final CustomSinkOperatorUidHashes operatorsUidHashes = > CustomSinkOperatorUidHashes.builder() > .setWriterUidHash(writerHash) > .setCommitterUidHash(committerHash) > .build(); > src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); > > G > > > On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen <zhanghao.c...@outlook.com> > wrote: > >> Hi Salva, >> >> The SinkV2 transformation will be translated to multiple operators at the >> physical level. When setting a UID, Flink will automatically generate UID >> for sub-operators by filling the configured UID in a pre-defined naming >> template. The naming template is carefully maintained to ensure >> cross-version state compatibility. However, this cannot be easily done when >> setting the uidHash, that's why Flink currently does not support setting it >> for non-legacy sinks. >> >> A possible solution is to convert uidHash back to the uid and apply the >> same strategy for generating uids to compute the corresponding uidHash for >> each suboperator. Maybe you can further investigate it and fire a JIRA >> issue on it. >> >> Best, >> Zhanghao Chen >> ------------------------------ >> *From:* Salva Alcántara <salcantara...@gmail.com> >> *Sent:* Sunday, June 9, 2024 14:49 >> *To:* Gabor Somogyi <gabor.g.somo...@gmail.com> >> *Cc:* user <user@flink.apache.org> >> *Subject:* Re: Setting uid hash for non-legacy sinks >> >> Hi Gabor, >> >> Yeah, I know this, but what if you initially forgot and now you want to >> add the uid "after the fact"? >> >> You need to get the operator/vertex id used by Flink for current >> savepoints and somehow set this id for the sink. >> >> With the uid method you would need to "hack" the existing hash (get a >> string which when hashed produces it). I guess this can be done since >> murmur3 is a non-cryptographic hash but Flink has the "setUidHash" >> precisely for that. From the javadocs: >> >> *Important*: this should be used as a workaround or for troubleshooting. >> The provided hash needs to be unique per transformation and job. A use case >> for this is in migration between Flink versions or changing the jobs in a >> way that changes the automatically generated hashes. In this case, >> providing the previous hashes directly through this method (e.g. obtained >> from old logs) can help to reestablish a lost mapping from states to their >> target operator. >> >> >> ...but as I said, it seems this method is not supported for new >> (non-legacy) sinks... >> >> Regards, >> >> Salva >> >> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >> Hi Salva, >> >> Just wondering why not good to set the uid like this? >> ``` >> output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); >> ``` >> >> From the mentioned UID Flink is going to make the hash which is >> consistent from UID -> HASH transformation perspective. >> >> BR, >> G >> >> >> >> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >> Hi, >> >> I want to add the uid for my Kafka sink in such a way that I can still >> use the existing savepoint. The problem I'm having is that I cannot set the >> uid hash. If I try something like this: >> >> ``` >> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); >> ``` >> >> I get the following error: >> >> ``` >> Exception in thread "main" java.lang.UnsupportedOperationException: >> Cannot set a custom UID hash on a non-legacy sink >> at >> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) >> ``` >> >> How can one set the operator id directly then for new (non-legacy) sinks? >> >> Kind regards, >> >> Salva >> >>