ah i think i got it. this line:
  hash = (19 * hash) + getDescriptor().hashCode();

the object returned by getDescriptor() doesnt define hashCode() and thus
object.hashcode() is used. resulting in unstable hash across jvms.


On Mon, Mar 21, 2022 at 10:10 AM Prashant Deva <[email protected]>
wrote:

> Here is the entire source code of the TraceKeyOuterClass.TraceKey:
> https://gist.github.com/pdeva/26bd29fb5b9b842dd84d9d9d26018cc0
>
> Its an autogenerated file by protoc. The hashCode actually is indeed
> stable. Here is the hashCode() method of TraceKey (also in the gist above):
>
> @java.lang.Override
>     public int hashCode() {
>       if (memoizedHashCode != 0) {
>         return memoizedHashCode;
>       }
>       int hash = 41;
>       hash = (19 * hash) + getDescriptor().hashCode();
>       hash = (37 * hash) + ACCOUNTID_FIELD_NUMBER;
>       hash = (53 * hash) + getAccountId().hashCode();
>       hash = (37 * hash) + TRACEID_FIELD_NUMBER;
>       hash = (53 * hash) + com.google.protobuf.Internal.hashLong(
>           getTraceID());
>       hash = (37 * hash) + ENV_FIELD_NUMBER;
>       hash = (53 * hash) + getEnv().hashCode();
>       hash = (29 * hash) + unknownFields.hashCode();
>       memoizedHashCode = hash;
>       return hash;
>     }
>
> On Sun, Mar 20, 2022 at 11:22 PM Guowei Ma <[email protected]> wrote:
>
>> It seems that the key's hashcode is not stable.
>> So would you like to show the details of the
>> `TraceKeyOuterClass.TraceKey`.
>>
>> Best,
>> Guowei
>>
>>
>> On Sun, Mar 20, 2022 at 3:21 PM Prashant Deva <[email protected]>
>> wrote:
>>
>>> here is the key code (in kotlin)
>>>
>>>  val ks =  object: KeySelector<Tuple2<TraceKeyOuterClass.TraceKey,
>>> TraceFragmentOuterClass.TraceFragment>, TraceKeyOuterClass.TraceKey> {
>>>         override fun getKey(it:Tuple2<TraceKeyOuterClass.TraceKey,
>>> TraceFragmentOuterClass.TraceFragment>): TraceKeyOuterClass.TraceKey {
>>>             return it.f0
>>>         }
>>>     }
>>>
>>> and here is the code that uses it:
>>>
>>> env.addSource(kafkaConsumer, name_source)
>>>
>>> .name(name_source).uid(name_source).setMaxParallelism(Config.MAX_PARALLELISM)
>>>             .keyBy (ks)
>>>
>>> .window(EventTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.seconds(60)))
>>>             .process(MyProcessor())
>>>
>>> .name(name_processor).uid(name_processor).setMaxParallelism(Config.MAX_PARALLELISM)
>>>             .addSink(kafkaProducer)
>>>             .uid(name_sink).name(name_sink)
>>>
>>>
>>> i am using protobufserializer from chill-protobuf library for serde. its
>>> configured as follows:
>>>
>>>
>>> env.config.registerTypeWithKryoSerializer(TraceFragmentOuterClass.TraceFragment::class.java,
>>> ProtobufSerializer::class.java)
>>>
>>> env.config.registerTypeWithKryoSerializer(TraceKeyOuterClass.TraceKey::class.java,
>>> ProtobufSerializer::class.java)
>>>
>>> env.config.registerTypeWithKryoSerializer(FullTraceOuterClass.FullTrace::class.java,
>>> ProtobufSerializer::class.java)
>>>
>>> env.config.registerTypeWithKryoSerializer(SpanOuterClass.Span::class.java,
>>> ProtobufSerializer::class.java)
>>>
>>>
>>> On Sun, Mar 20, 2022 at 12:15 AM caoyu <[email protected]> wrote:
>>>
>>>> Would you like copy the key code here to help debugging.
>>>>
>>>> ---- Replied Message ----
>>>> From Prashant Deva<[email protected]> <[email protected]>
>>>> Date 03/20/2022 12:24
>>>> To user<[email protected]> <[email protected]>
>>>> Subject exception when parallelizing application
>>>> using flink 1.13.2. When i increase the parallelization of my
>>>> application from 1 to 2, i see the following exceptions. what do they mean?
>>>> how can i possibly fix this?
>>>>
>>>> java.lang.IllegalArgumentException: key group from 128 to 256 does not 
>>>> contain 89
>>>>    at 
>>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>>>    at 
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>>>    at 
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>>>    at 
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>>>    at 
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922)
>>>>    at 
>>>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:44)
>>>>    at 
>>>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:936)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>>>    at 
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>>>    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784)
>>>>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571)
>>>>    at java.base/java.lang.Thread.run(Thread.java:829)
>>>>
>>>>

Reply via email to