Thanks Gabor, I will give it a try!

On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Now I see the intention and then you must have a V2 sink, right? Maybe you
> look for the following:
>
> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
> final CustomSinkOperatorUidHashes operatorsUidHashes =
>         CustomSinkOperatorUidHashes.builder()
>                 .setWriterUidHash(writerHash)
>                 .setCommitterUidHash(committerHash)
>                 .build();
> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
>
> G
>
>
> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen <zhanghao.c...@outlook.com>
> wrote:
>
>> Hi Salva,
>>
>> The SinkV2 transformation will be translated to multiple operators at the
>> physical level. When setting a UID, Flink will automatically generate UID
>> for sub-operators by filling the configured UID in a pre-defined naming
>> template. The naming template is carefully maintained to ensure
>> cross-version state compatibility. However, this cannot be easily done when
>> setting the uidHash, that's why Flink currently does not support setting it
>> for non-legacy sinks.
>>
>> A possible solution is to convert uidHash back to the uid and apply the
>> same strategy for generating uids to compute the corresponding uidHash for
>> each suboperator. Maybe you can further investigate it and fire a JIRA
>> issue on it.
>>
>> Best,
>> Zhanghao Chen
>> ------------------------------
>> *From:* Salva Alcántara <salcantara...@gmail.com>
>> *Sent:* Sunday, June 9, 2024 14:49
>> *To:* Gabor Somogyi <gabor.g.somo...@gmail.com>
>> *Cc:* user <user@flink.apache.org>
>> *Subject:* Re: Setting uid hash for non-legacy sinks
>>
>> Hi Gabor,
>>
>> Yeah, I know this, but what if you initially forgot and now you want to
>> add the uid "after the fact"?
>>
>> You need to get the operator/vertex id used by Flink for current
>> savepoints and somehow set this id for the sink.
>>
>> With the uid method you would need to "hack" the existing hash (get a
>> string which when hashed produces it). I guess this can be done since
>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
>> precisely for that. From the javadocs:
>>
>> *Important*: this should be used as a workaround or for troubleshooting.
>> The provided hash needs to be unique per transformation and job. A use case
>> for this is in migration between Flink versions or changing the jobs in a
>> way that changes the automatically generated hashes. In this case,
>> providing the previous hashes directly through this method (e.g. obtained
>> from old logs) can help to reestablish a lost mapping from states to their
>> target operator.
>>
>>
>> ...but as I said, it seems this method is not supported for new
>> (non-legacy) sinks...
>>
>> Regards,
>>
>> Salva
>>
>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>> Hi Salva,
>>
>> Just wondering why not good to set the uid like this?
>> ```
>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
>> ```
>>
>> From the mentioned UID Flink is going to make the hash which is
>> consistent from UID -> HASH transformation perspective.
>>
>> BR,
>> G
>>
>>
>>
>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I want to add the uid for my Kafka sink in such a way that I can still
>> use the existing savepoint. The problem I'm having is that I cannot set the
>> uid hash. If I try something like this:
>>
>> ```
>> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
>> ```
>>
>> I get the following error:
>>
>> ```
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Cannot set a custom UID hash on a non-legacy sink
>> at
>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
>> ```
>>
>> How can one set the operator id directly then for new (non-legacy) sinks?
>>
>> Kind regards,
>>
>> Salva
>>
>>

Reply via email to