YW, ping me back whether it works because it's a nifty feature.

G

On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara <salcantara...@gmail.com>
wrote:

> Thanks Gabor, I will give it a try!
>
> On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> Now I see the intention and then you must have a V2 sink, right? Maybe
>> you look for the following:
>>
>> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
>> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
>> final CustomSinkOperatorUidHashes operatorsUidHashes =
>>         CustomSinkOperatorUidHashes.builder()
>>                 .setWriterUidHash(writerHash)
>>                 .setCommitterUidHash(committerHash)
>>                 .build();
>> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
>>
>> G
>>
>>
>> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen <zhanghao.c...@outlook.com>
>> wrote:
>>
>>> Hi Salva,
>>>
>>> The SinkV2 transformation will be translated to multiple operators at
>>> the physical level. When setting a UID, Flink will automatically generate
>>> UID for sub-operators by filling the configured UID in a pre-defined naming
>>> template. The naming template is carefully maintained to ensure
>>> cross-version state compatibility. However, this cannot be easily done when
>>> setting the uidHash, that's why Flink currently does not support setting it
>>> for non-legacy sinks.
>>>
>>> A possible solution is to convert uidHash back to the uid and apply the
>>> same strategy for generating uids to compute the corresponding uidHash for
>>> each suboperator. Maybe you can further investigate it and fire a JIRA
>>> issue on it.
>>>
>>> Best,
>>> Zhanghao Chen
>>> ------------------------------
>>> *From:* Salva Alcántara <salcantara...@gmail.com>
>>> *Sent:* Sunday, June 9, 2024 14:49
>>> *To:* Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> *Cc:* user <user@flink.apache.org>
>>> *Subject:* Re: Setting uid hash for non-legacy sinks
>>>
>>> Hi Gabor,
>>>
>>> Yeah, I know this, but what if you initially forgot and now you want to
>>> add the uid "after the fact"?
>>>
>>> You need to get the operator/vertex id used by Flink for current
>>> savepoints and somehow set this id for the sink.
>>>
>>> With the uid method you would need to "hack" the existing hash (get a
>>> string which when hashed produces it). I guess this can be done since
>>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
>>> precisely for that. From the javadocs:
>>>
>>> *Important*: this should be used as a workaround or for
>>> troubleshooting. The provided hash needs to be unique per transformation
>>> and job. A use case for this is in migration between Flink versions or
>>> changing the jobs in a way that changes the automatically generated hashes.
>>> In this case, providing the previous hashes directly through this method
>>> (e.g. obtained from old logs) can help to reestablish a lost mapping from
>>> states to their target operator.
>>>
>>>
>>> ...but as I said, it seems this method is not supported for new
>>> (non-legacy) sinks...
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>> Hi Salva,
>>>
>>> Just wondering why not good to set the uid like this?
>>> ```
>>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
>>> ```
>>>
>>> From the mentioned UID Flink is going to make the hash which is
>>> consistent from UID -> HASH transformation perspective.
>>>
>>> BR,
>>> G
>>>
>>>
>>>
>>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara <salcantara...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I want to add the uid for my Kafka sink in such a way that I can still
>>> use the existing savepoint. The problem I'm having is that I cannot set the
>>> uid hash. If I try something like this:
>>>
>>> ```
>>> output.sinkTo(outputSink).setUidHash("xyzbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
>>> ```
>>>
>>> I get the following error:
>>>
>>> ```
>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>> Cannot set a custom UID hash on a non-legacy sink
>>> at
>>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
>>> ```
>>>
>>> How can one set the operator id directly then for new (non-legacy) sinks?
>>>
>>> Kind regards,
>>>
>>> Salva
>>>
>>>

Reply via email to