YW, ping me back whether it works because it's a nifty feature. G
On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara <salcantara...@gmail.com> wrote: > Thanks Gabor, I will give it a try! > > On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> Now I see the intention and then you must have a V2 sink, right? Maybe >> you look for the following: >> >> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; >> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; >> final CustomSinkOperatorUidHashes operatorsUidHashes = >> CustomSinkOperatorUidHashes.builder() >> .setWriterUidHash(writerHash) >> .setCommitterUidHash(committerHash) >> .build(); >> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); >> >> G >> >> >> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen <zhanghao.c...@outlook.com> >> wrote: >> >>> Hi Salva, >>> >>> The SinkV2 transformation will be translated to multiple operators at >>> the physical level. When setting a UID, Flink will automatically generate >>> UID for sub-operators by filling the configured UID in a pre-defined naming >>> template. The naming template is carefully maintained to ensure >>> cross-version state compatibility. However, this cannot be easily done when >>> setting the uidHash, that's why Flink currently does not support setting it >>> for non-legacy sinks. >>> >>> A possible solution is to convert uidHash back to the uid and apply the >>> same strategy for generating uids to compute the corresponding uidHash for >>> each suboperator. Maybe you can further investigate it and fire a JIRA >>> issue on it. >>> >>> Best, >>> Zhanghao Chen >>> ------------------------------ >>> *From:* Salva Alcántara <salcantara...@gmail.com> >>> *Sent:* Sunday, June 9, 2024 14:49 >>> *To:* Gabor Somogyi <gabor.g.somo...@gmail.com> >>> *Cc:* user <user@flink.apache.org> >>> *Subject:* Re: Setting uid hash for non-legacy sinks >>> >>> Hi Gabor, >>> >>> Yeah, I know this, but what if you initially forgot and now you want to >>> add the uid "after the fact"? >>> >>> You need to get the operator/vertex id used by Flink for current >>> savepoints and somehow set this id for the sink. >>> >>> With the uid method you would need to "hack" the existing hash (get a >>> string which when hashed produces it). I guess this can be done since >>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash" >>> precisely for that. From the javadocs: >>> >>> *Important*: this should be used as a workaround or for >>> troubleshooting. The provided hash needs to be unique per transformation >>> and job. A use case for this is in migration between Flink versions or >>> changing the jobs in a way that changes the automatically generated hashes. >>> In this case, providing the previous hashes directly through this method >>> (e.g. obtained from old logs) can help to reestablish a lost mapping from >>> states to their target operator. >>> >>> >>> ...but as I said, it seems this method is not supported for new >>> (non-legacy) sinks... >>> >>> Regards, >>> >>> Salva >>> >>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>> Hi Salva, >>> >>> Just wondering why not good to set the uid like this? >>> ``` >>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid"); >>> ``` >>> >>> From the mentioned UID Flink is going to make the hash which is >>> consistent from UID -> HASH transformation perspective. >>> >>> BR, >>> G >>> >>> >>> >>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <salcantara...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> I want to add the uid for my Kafka sink in such a way that I can still >>> use the existing savepoint. The problem I'm having is that I cannot set the >>> uid hash. If I try something like this: >>> >>> ``` >>> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); >>> ``` >>> >>> I get the following error: >>> >>> ``` >>> Exception in thread "main" java.lang.UnsupportedOperationException: >>> Cannot set a custom UID hash on a non-legacy sink >>> at >>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163) >>> ``` >>> >>> How can one set the operator id directly then for new (non-legacy) sinks? >>> >>> Kind regards, >>> >>> Salva >>> >>>