Glad to hear that you solved your problem. Afaik Flink should not read the
fields of messages and call hashCode on them.

Cheers,
Till

On Fri, Oct 23, 2020 at 2:18 PM Radoslav Smilyanov <
radoslav.smilya...@smule.com> wrote:

> Hi Till,
>
> I found my problem. It was indeed related to a mutable hashcode.
>
> I was using a protobuf message in the key selector function and one of the
> protobuf fields was enum. I checked the implementation of the hashcode of
> the generated message and it is using the int value field of the protobuf
> message so I assumed that it is ok and it's immutable.
>
> I replaced the key selector function to use Tuple[Long, Int] (since my
> protobuf message has only these two fields where the int parameter stands
> for the enum value field). After changing my code to use the Tuple it
> worked.
>
> I am not sure if Flink somehow reads the protobuf message fields and uses
> the hashcode of the fields directly since the generated protobuf enum
> indeed has a mutable hashcode (Enum.hashcode).
>
> Nevertheless it's ok with the Tuple key.
>
> Thanks for your response!
>
> Best Regards,
> Rado
>
>
> On Fri, Oct 23, 2020 at 2:39 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hi Rado,
>>
>> it is hard to tell the reason w/o a bit more details. Could you share
>> with us the complete logs of the problematic run? Also the job you are
>> running and the types of the state you are storing in RocksDB and use as
>> events in your job are very important. In the linked SO question, the
>> problem was a type whose hashcode was not immutable.
>>
>> Cheers,
>> Till
>>
>> On Wed, Oct 21, 2020 at 6:24 PM Radoslav Smilyanov <
>> radoslav.smilya...@smule.com> wrote:
>>
>>> Hello all,
>>>
>>> I am running a Flink job that performs data enrichment. My job has 7
>>> kafka consumers that receive messages for dml statements performed for 7 db
>>> tables.
>>>
>>> Job setup:
>>>
>>>    - Flink is run in k8s in a similar way as it is described here
>>>    
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#job-cluster-resource-definitions>
>>>    .
>>>    - 1 job manager and 2 task managers
>>>    - parallelism is set to 4 and 2 task slots
>>>    - rocksdb as state backend
>>>    - protobuf for serialization
>>>
>>> Whenever I try to trigger a savepoint after my state is bootstrapped I
>>> get the following error for different operators:
>>>
>>> Caused by: java.lang.IllegalArgumentException: Key group 0 is not in
>>> KeyGroupRange{startKeyGroup=32, endKeyGroup=63}.
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:319)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:261)
>>>
>>> Note: key group might vary.
>>>
>>> I found this
>>> <https://stackoverflow.com/questions/49140654/flink-error-key-group-is-not-in-keygrouprange>
>>>  article
>>> in Stackoverflow which relates to such an exception (btw my job graph looks
>>> similar to the one described in the article except that my job has more
>>> joins). I double checked my hashcodes and I think that they are fine.
>>>
>>> I tried to reduce the parallelism to 1 with 1 task slot per task manager
>>> and this configuration seems to work. This leads me to a direction that it
>>> might be some concurrency issue.
>>>
>>> I would like to understand what is causing the savepoint failure. Do you
>>> have any suggestions what I might be missing?
>>>
>>> Thanks in advance!
>>>
>>> Best Regards,
>>> Rado
>>>
>>

Reply via email to