Re: Setting uid hash for non-legacy sinks

2024-06-12 Thread Salva Alcántara
{
  "emoji": "♥️",
  "version": 1
}

Re: Setting uid hash for non-legacy sinks

2024-06-12 Thread Gabor Somogyi
Hey Salva,

Good to hear your issue has been resolved one way or another!
Thanks for confirming that this operator hash trick is working on V2 as
well.

G


On Wed, Jun 12, 2024 at 5:20 AM Salva Alcántara 
wrote:

> Hey Gabor,
>
> I didn't finally need to keep compatibility with existing savepoints so I
> restarted my jobs from scratch so to say.
>
> However, I have given it a try locally and by debugging the job graph I
> can see the hashes manually set following your recipe (see the
> userDefinedOperatorIDs) so it seems to work fine!
>
> Regards,
>
> Salva
>
> On Mon, Jun 10, 2024 at 9:49 AM Gabor Somogyi 
> wrote:
>
>> YW, ping me back whether it works because it's a nifty feature.
>>
>> G
>>
>> On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara 
>> wrote:
>>
>>> Thanks Gabor, I will give it a try!
>>>
>>> On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi <
>>> gabor.g.somo...@gmail.com> wrote:
>>>
>>>> Now I see the intention and then you must have a V2 sink, right? Maybe
>>>> you look for the following:
>>>>
>>>> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
>>>> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
>>>> final CustomSinkOperatorUidHashes operatorsUidHashes =
>>>> CustomSinkOperatorUidHashes.builder()
>>>> .setWriterUidHash(writerHash)
>>>> .setCommitterUidHash(committerHash)
>>>> .build();
>>>> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
>>>>
>>>> G
>>>>
>>>>
>>>> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen 
>>>> wrote:
>>>>
>>>>> Hi Salva,
>>>>>
>>>>> The SinkV2 transformation will be translated to multiple operators at
>>>>> the physical level. When setting a UID, Flink will automatically generate
>>>>> UID for sub-operators by filling the configured UID in a pre-defined 
>>>>> naming
>>>>> template. The naming template is carefully maintained to ensure
>>>>> cross-version state compatibility. However, this cannot be easily done 
>>>>> when
>>>>> setting the uidHash, that's why Flink currently does not support setting 
>>>>> it
>>>>> for non-legacy sinks.
>>>>>
>>>>> A possible solution is to convert uidHash back to the uid and apply
>>>>> the same strategy for generating uids to compute the corresponding uidHash
>>>>> for each suboperator. Maybe you can further investigate it and fire a JIRA
>>>>> issue on it.
>>>>>
>>>>> Best,
>>>>> Zhanghao Chen
>>>>> --
>>>>> *From:* Salva Alcántara 
>>>>> *Sent:* Sunday, June 9, 2024 14:49
>>>>> *To:* Gabor Somogyi 
>>>>> *Cc:* user 
>>>>> *Subject:* Re: Setting uid hash for non-legacy sinks
>>>>>
>>>>> Hi Gabor,
>>>>>
>>>>> Yeah, I know this, but what if you initially forgot and now you want
>>>>> to add the uid "after the fact"?
>>>>>
>>>>> You need to get the operator/vertex id used by Flink for current
>>>>> savepoints and somehow set this id for the sink.
>>>>>
>>>>> With the uid method you would need to "hack" the existing hash (get a
>>>>> string which when hashed produces it). I guess this can be done since
>>>>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
>>>>> precisely for that. From the javadocs:
>>>>>
>>>>> *Important*: this should be used as a workaround or for
>>>>> troubleshooting. The provided hash needs to be unique per transformation
>>>>> and job. A use case for this is in migration between Flink versions or
>>>>> changing the jobs in a way that changes the automatically generated 
>>>>> hashes.
>>>>> In this case, providing the previous hashes directly through this method
>>>>> (e.g. obtained from old logs) can help to reestablish a lost mapping from
>>>>> states to their target operator.
>>>>>
>>>>>
>>>>> ...but as I said, it seems this method is not supported for new
>>>>> (non-legacy) sinks..

Re: Setting uid hash for non-legacy sinks

2024-06-10 Thread Gabor Somogyi
YW, ping me back whether it works because it's a nifty feature.

G

On Mon, Jun 10, 2024 at 9:26 AM Salva Alcántara 
wrote:

> Thanks Gabor, I will give it a try!
>
> On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi 
> wrote:
>
>> Now I see the intention and then you must have a V2 sink, right? Maybe
>> you look for the following:
>>
>> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
>> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
>> final CustomSinkOperatorUidHashes operatorsUidHashes =
>> CustomSinkOperatorUidHashes.builder()
>> .setWriterUidHash(writerHash)
>> .setCommitterUidHash(committerHash)
>> .build();
>> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
>>
>> G
>>
>>
>> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen 
>> wrote:
>>
>>> Hi Salva,
>>>
>>> The SinkV2 transformation will be translated to multiple operators at
>>> the physical level. When setting a UID, Flink will automatically generate
>>> UID for sub-operators by filling the configured UID in a pre-defined naming
>>> template. The naming template is carefully maintained to ensure
>>> cross-version state compatibility. However, this cannot be easily done when
>>> setting the uidHash, that's why Flink currently does not support setting it
>>> for non-legacy sinks.
>>>
>>> A possible solution is to convert uidHash back to the uid and apply the
>>> same strategy for generating uids to compute the corresponding uidHash for
>>> each suboperator. Maybe you can further investigate it and fire a JIRA
>>> issue on it.
>>>
>>> Best,
>>> Zhanghao Chen
>>> --
>>> *From:* Salva Alcántara 
>>> *Sent:* Sunday, June 9, 2024 14:49
>>> *To:* Gabor Somogyi 
>>> *Cc:* user 
>>> *Subject:* Re: Setting uid hash for non-legacy sinks
>>>
>>> Hi Gabor,
>>>
>>> Yeah, I know this, but what if you initially forgot and now you want to
>>> add the uid "after the fact"?
>>>
>>> You need to get the operator/vertex id used by Flink for current
>>> savepoints and somehow set this id for the sink.
>>>
>>> With the uid method you would need to "hack" the existing hash (get a
>>> string which when hashed produces it). I guess this can be done since
>>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
>>> precisely for that. From the javadocs:
>>>
>>> *Important*: this should be used as a workaround or for
>>> troubleshooting. The provided hash needs to be unique per transformation
>>> and job. A use case for this is in migration between Flink versions or
>>> changing the jobs in a way that changes the automatically generated hashes.
>>> In this case, providing the previous hashes directly through this method
>>> (e.g. obtained from old logs) can help to reestablish a lost mapping from
>>> states to their target operator.
>>>
>>>
>>> ...but as I said, it seems this method is not supported for new
>>> (non-legacy) sinks...
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi 
>>> wrote:
>>>
>>> Hi Salva,
>>>
>>> Just wondering why not good to set the uid like this?
>>> ```
>>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
>>> ```
>>>
>>> From the mentioned UID Flink is going to make the hash which is
>>> consistent from UID -> HASH transformation perspective.
>>>
>>> BR,
>>> G
>>>
>>>
>>>
>>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara 
>>> wrote:
>>>
>>> Hi,
>>>
>>> I want to add the uid for my Kafka sink in such a way that I can still
>>> use the existing savepoint. The problem I'm having is that I cannot set the
>>> uid hash. If I try something like this:
>>>
>>> ```
>>> output.sinkTo(outputSink).setUidHash("xyzb");
>>> ```
>>>
>>> I get the following error:
>>>
>>> ```
>>> Exception in thread "main" java.lang.UnsupportedOperationException:
>>> Cannot set a custom UID hash on a non-legacy sink
>>> at
>>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
>>> ```
>>>
>>> How can one set the operator id directly then for new (non-legacy) sinks?
>>>
>>> Kind regards,
>>>
>>> Salva
>>>
>>>


Re: Setting uid hash for non-legacy sinks

2024-06-10 Thread Salva Alcántara
Thanks Gabor, I will give it a try!

On Mon, Jun 10, 2024 at 12:01 AM Gabor Somogyi 
wrote:

> Now I see the intention and then you must have a V2 sink, right? Maybe you
> look for the following:
>
> final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
> final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
> final CustomSinkOperatorUidHashes operatorsUidHashes =
> CustomSinkOperatorUidHashes.builder()
> .setWriterUidHash(writerHash)
> .setCommitterUidHash(committerHash)
> .build();
> src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);
>
> G
>
>
> On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen 
> wrote:
>
>> Hi Salva,
>>
>> The SinkV2 transformation will be translated to multiple operators at the
>> physical level. When setting a UID, Flink will automatically generate UID
>> for sub-operators by filling the configured UID in a pre-defined naming
>> template. The naming template is carefully maintained to ensure
>> cross-version state compatibility. However, this cannot be easily done when
>> setting the uidHash, that's why Flink currently does not support setting it
>> for non-legacy sinks.
>>
>> A possible solution is to convert uidHash back to the uid and apply the
>> same strategy for generating uids to compute the corresponding uidHash for
>> each suboperator. Maybe you can further investigate it and fire a JIRA
>> issue on it.
>>
>> Best,
>> Zhanghao Chen
>> --
>> *From:* Salva Alcántara 
>> *Sent:* Sunday, June 9, 2024 14:49
>> *To:* Gabor Somogyi 
>> *Cc:* user 
>> *Subject:* Re: Setting uid hash for non-legacy sinks
>>
>> Hi Gabor,
>>
>> Yeah, I know this, but what if you initially forgot and now you want to
>> add the uid "after the fact"?
>>
>> You need to get the operator/vertex id used by Flink for current
>> savepoints and somehow set this id for the sink.
>>
>> With the uid method you would need to "hack" the existing hash (get a
>> string which when hashed produces it). I guess this can be done since
>> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
>> precisely for that. From the javadocs:
>>
>> *Important*: this should be used as a workaround or for troubleshooting.
>> The provided hash needs to be unique per transformation and job. A use case
>> for this is in migration between Flink versions or changing the jobs in a
>> way that changes the automatically generated hashes. In this case,
>> providing the previous hashes directly through this method (e.g. obtained
>> from old logs) can help to reestablish a lost mapping from states to their
>> target operator.
>>
>>
>> ...but as I said, it seems this method is not supported for new
>> (non-legacy) sinks...
>>
>> Regards,
>>
>> Salva
>>
>> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi 
>> wrote:
>>
>> Hi Salva,
>>
>> Just wondering why not good to set the uid like this?
>> ```
>> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
>> ```
>>
>> From the mentioned UID Flink is going to make the hash which is
>> consistent from UID -> HASH transformation perspective.
>>
>> BR,
>> G
>>
>>
>>
>> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara 
>> wrote:
>>
>> Hi,
>>
>> I want to add the uid for my Kafka sink in such a way that I can still
>> use the existing savepoint. The problem I'm having is that I cannot set the
>> uid hash. If I try something like this:
>>
>> ```
>> output.sinkTo(outputSink).setUidHash("xyzb");
>> ```
>>
>> I get the following error:
>>
>> ```
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Cannot set a custom UID hash on a non-legacy sink
>> at
>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
>> ```
>>
>> How can one set the operator id directly then for new (non-legacy) sinks?
>>
>> Kind regards,
>>
>> Salva
>>
>>


Re: Setting uid hash for non-legacy sinks

2024-06-09 Thread Gabor Somogyi
Now I see the intention and then you must have a V2 sink, right? Maybe you
look for the following:

final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead";
final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10";
final CustomSinkOperatorUidHashes operatorsUidHashes =
CustomSinkOperatorUidHashes.builder()
.setWriterUidHash(writerHash)
.setCommitterUidHash(committerHash)
.build();
src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME);

G


On Sun, Jun 9, 2024 at 1:21 PM Zhanghao Chen 
wrote:

> Hi Salva,
>
> The SinkV2 transformation will be translated to multiple operators at the
> physical level. When setting a UID, Flink will automatically generate UID
> for sub-operators by filling the configured UID in a pre-defined naming
> template. The naming template is carefully maintained to ensure
> cross-version state compatibility. However, this cannot be easily done when
> setting the uidHash, that's why Flink currently does not support setting it
> for non-legacy sinks.
>
> A possible solution is to convert uidHash back to the uid and apply the
> same strategy for generating uids to compute the corresponding uidHash for
> each suboperator. Maybe you can further investigate it and fire a JIRA
> issue on it.
>
> Best,
> Zhanghao Chen
> --
> *From:* Salva Alcántara 
> *Sent:* Sunday, June 9, 2024 14:49
> *To:* Gabor Somogyi 
> *Cc:* user 
> *Subject:* Re: Setting uid hash for non-legacy sinks
>
> Hi Gabor,
>
> Yeah, I know this, but what if you initially forgot and now you want to
> add the uid "after the fact"?
>
> You need to get the operator/vertex id used by Flink for current
> savepoints and somehow set this id for the sink.
>
> With the uid method you would need to "hack" the existing hash (get a
> string which when hashed produces it). I guess this can be done since
> murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
> precisely for that. From the javadocs:
>
> *Important*: this should be used as a workaround or for troubleshooting.
> The provided hash needs to be unique per transformation and job. A use case
> for this is in migration between Flink versions or changing the jobs in a
> way that changes the automatically generated hashes. In this case,
> providing the previous hashes directly through this method (e.g. obtained
> from old logs) can help to reestablish a lost mapping from states to their
> target operator.
>
>
> ...but as I said, it seems this method is not supported for new
> (non-legacy) sinks...
>
> Regards,
>
> Salva
>
> On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi 
> wrote:
>
> Hi Salva,
>
> Just wondering why not good to set the uid like this?
> ```
> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
> ```
>
> From the mentioned UID Flink is going to make the hash which is consistent
> from UID -> HASH transformation perspective.
>
> BR,
> G
>
>
>
> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara 
> wrote:
>
> Hi,
>
> I want to add the uid for my Kafka sink in such a way that I can still use
> the existing savepoint. The problem I'm having is that I cannot set the uid
> hash. If I try something like this:
>
> ```
> output.sinkTo(outputSink).setUidHash("xyzb");
> ```
>
> I get the following error:
>
> ```
> Exception in thread "main" java.lang.UnsupportedOperationException: Cannot
> set a custom UID hash on a non-legacy sink
> at
> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
> ```
>
> How can one set the operator id directly then for new (non-legacy) sinks?
>
> Kind regards,
>
> Salva
>
>


Re: Setting uid hash for non-legacy sinks

2024-06-09 Thread Zhanghao Chen
Hi Salva,

The SinkV2 transformation will be translated to multiple operators at the 
physical level. When setting a UID, Flink will automatically generate UID for 
sub-operators by filling the configured UID in a pre-defined naming template. 
The naming template is carefully maintained to ensure cross-version state 
compatibility. However, this cannot be easily done when setting the uidHash, 
that's why Flink currently does not support setting it for non-legacy sinks.

A possible solution is to convert uidHash back to the uid and apply the same 
strategy for generating uids to compute the corresponding uidHash for each 
suboperator. Maybe you can further investigate it and fire a JIRA issue on it.

Best,
Zhanghao Chen

From: Salva Alcántara 
Sent: Sunday, June 9, 2024 14:49
To: Gabor Somogyi 
Cc: user 
Subject: Re: Setting uid hash for non-legacy sinks

Hi Gabor,

Yeah, I know this, but what if you initially forgot and now you want to add the 
uid "after the fact"?

You need to get the operator/vertex id used by Flink for current savepoints and 
somehow set this id for the sink.

With the uid method you would need to "hack" the existing hash (get a string 
which when hashed produces it). I guess this can be done since murmur3 is a 
non-cryptographic hash but Flink has the "setUidHash" precisely for that. From 
the javadocs:

Important: this should be used as a workaround or for troubleshooting. The 
provided hash needs to be unique per transformation and job. A use case for 
this is in migration between Flink versions or changing the jobs in a way that 
changes the automatically generated hashes. In this case, providing the 
previous hashes directly through this method (e.g. obtained from old logs) can 
help to reestablish a lost mapping from states to their target operator.

...but as I said, it seems this method is not supported for new (non-legacy) 
sinks...

Regards,

Salva

On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi 
mailto:gabor.g.somo...@gmail.com>> wrote:
Hi Salva,

Just wondering why not good to set the uid like this?
```
output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
```

From the mentioned UID Flink is going to make the hash which is consistent from 
UID -> HASH transformation perspective.

BR,
G



On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara 
mailto:salcantara...@gmail.com>> wrote:
Hi,

I want to add the uid for my Kafka sink in such a way that I can still use the 
existing savepoint. The problem I'm having is that I cannot set the uid hash. 
If I try something like this:

```
output.sinkTo(outputSink).setUidHash("xyzb");
```

I get the following error:

```
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot set 
a custom UID hash on a non-legacy sink
at 
org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
```

How can one set the operator id directly then for new (non-legacy) sinks?

Kind regards,

Salva



Re: Setting uid hash for non-legacy sinks

2024-06-08 Thread Salva Alcántara
Hi Gabor,

Yeah, I know this, but what if you initially forgot and now you want to add
the uid "after the fact"?

You need to get the operator/vertex id used by Flink for current savepoints
and somehow set this id for the sink.

With the uid method you would need to "hack" the existing hash (get a
string which when hashed produces it). I guess this can be done since
murmur3 is a non-cryptographic hash but Flink has the "setUidHash"
precisely for that. From the javadocs:

*Important*: this should be used as a workaround or for troubleshooting.
> The provided hash needs to be unique per transformation and job. A use case
> for this is in migration between Flink versions or changing the jobs in a
> way that changes the automatically generated hashes. In this case,
> providing the previous hashes directly through this method (e.g. obtained
> from old logs) can help to reestablish a lost mapping from states to their
> target operator.


...but as I said, it seems this method is not supported for new
(non-legacy) sinks...

Regards,

Salva

On Fri, Jun 7, 2024 at 10:20 AM Gabor Somogyi 
wrote:

> Hi Salva,
>
> Just wondering why not good to set the uid like this?
> ```
> output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
> ```
>
> From the mentioned UID Flink is going to make the hash which is consistent
> from UID -> HASH transformation perspective.
>
> BR,
> G
>
>
>
> On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara 
> wrote:
>
>> Hi,
>>
>> I want to add the uid for my Kafka sink in such a way that I can still
>> use the existing savepoint. The problem I'm having is that I cannot set the
>> uid hash. If I try something like this:
>>
>> ```
>> output.sinkTo(outputSink).setUidHash("xyzb");
>> ```
>>
>> I get the following error:
>>
>> ```
>> Exception in thread "main" java.lang.UnsupportedOperationException:
>> Cannot set a custom UID hash on a non-legacy sink
>> at
>> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
>> ```
>>
>> How can one set the operator id directly then for new (non-legacy) sinks?
>>
>> Kind regards,
>>
>> Salva
>>
>>


Re: Setting uid hash for non-legacy sinks

2024-06-07 Thread Gabor Somogyi
Hi Salva,

Just wondering why not good to set the uid like this?
```
output.sinkTo(outputSink).uid("my-human-readable-sink-uid");
```

>From the mentioned UID Flink is going to make the hash which is consistent
from UID -> HASH transformation perspective.

BR,
G



On Fri, Jun 7, 2024 at 7:54 AM Salva Alcántara 
wrote:

> Hi,
>
> I want to add the uid for my Kafka sink in such a way that I can still use
> the existing savepoint. The problem I'm having is that I cannot set the uid
> hash. If I try something like this:
>
> ```
> output.sinkTo(outputSink).setUidHash("xyzb");
> ```
>
> I get the following error:
>
> ```
> Exception in thread "main" java.lang.UnsupportedOperationException: Cannot
> set a custom UID hash on a non-legacy sink
> at
> org.apache.flink.streaming.api.datastream.DataStreamSink.setUidHash(DataStreamSink.java:163)
> ```
>
> How can one set the operator id directly then for new (non-legacy) sinks?
>
> Kind regards,
>
> Salva
>
>