ah i think i got it. this line: hash = (19 * hash) + getDescriptor().hashCode();
the object returned by getDescriptor() doesnt define hashCode() and thus object.hashcode() is used. resulting in unstable hash across jvms. On Mon, Mar 21, 2022 at 10:10 AM Prashant Deva <[email protected]> wrote: > Here is the entire source code of the TraceKeyOuterClass.TraceKey: > https://gist.github.com/pdeva/26bd29fb5b9b842dd84d9d9d26018cc0 > > Its an autogenerated file by protoc. The hashCode actually is indeed > stable. Here is the hashCode() method of TraceKey (also in the gist above): > > @java.lang.Override > public int hashCode() { > if (memoizedHashCode != 0) { > return memoizedHashCode; > } > int hash = 41; > hash = (19 * hash) + getDescriptor().hashCode(); > hash = (37 * hash) + ACCOUNTID_FIELD_NUMBER; > hash = (53 * hash) + getAccountId().hashCode(); > hash = (37 * hash) + TRACEID_FIELD_NUMBER; > hash = (53 * hash) + com.google.protobuf.Internal.hashLong( > getTraceID()); > hash = (37 * hash) + ENV_FIELD_NUMBER; > hash = (53 * hash) + getEnv().hashCode(); > hash = (29 * hash) + unknownFields.hashCode(); > memoizedHashCode = hash; > return hash; > } > > On Sun, Mar 20, 2022 at 11:22 PM Guowei Ma <[email protected]> wrote: > >> It seems that the key's hashcode is not stable. >> So would you like to show the details of the >> `TraceKeyOuterClass.TraceKey`. >> >> Best, >> Guowei >> >> >> On Sun, Mar 20, 2022 at 3:21 PM Prashant Deva <[email protected]> >> wrote: >> >>> here is the key code (in kotlin) >>> >>> val ks = object: KeySelector<Tuple2<TraceKeyOuterClass.TraceKey, >>> TraceFragmentOuterClass.TraceFragment>, TraceKeyOuterClass.TraceKey> { >>> override fun getKey(it:Tuple2<TraceKeyOuterClass.TraceKey, >>> TraceFragmentOuterClass.TraceFragment>): TraceKeyOuterClass.TraceKey { >>> return it.f0 >>> } >>> } >>> >>> and here is the code that uses it: >>> >>> env.addSource(kafkaConsumer, name_source) >>> >>> .name(name_source).uid(name_source).setMaxParallelism(Config.MAX_PARALLELISM) >>> .keyBy (ks) >>> >>> .window(EventTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.seconds(60))) >>> .process(MyProcessor()) >>> >>> .name(name_processor).uid(name_processor).setMaxParallelism(Config.MAX_PARALLELISM) >>> .addSink(kafkaProducer) >>> .uid(name_sink).name(name_sink) >>> >>> >>> i am using protobufserializer from chill-protobuf library for serde. its >>> configured as follows: >>> >>> >>> env.config.registerTypeWithKryoSerializer(TraceFragmentOuterClass.TraceFragment::class.java, >>> ProtobufSerializer::class.java) >>> >>> env.config.registerTypeWithKryoSerializer(TraceKeyOuterClass.TraceKey::class.java, >>> ProtobufSerializer::class.java) >>> >>> env.config.registerTypeWithKryoSerializer(FullTraceOuterClass.FullTrace::class.java, >>> ProtobufSerializer::class.java) >>> >>> env.config.registerTypeWithKryoSerializer(SpanOuterClass.Span::class.java, >>> ProtobufSerializer::class.java) >>> >>> >>> On Sun, Mar 20, 2022 at 12:15 AM caoyu <[email protected]> wrote: >>> >>>> Would you like copy the key code here to help debugging. >>>> >>>> ---- Replied Message ---- >>>> From Prashant Deva<[email protected]> <[email protected]> >>>> Date 03/20/2022 12:24 >>>> To user<[email protected]> <[email protected]> >>>> Subject exception when parallelizing application >>>> using flink 1.13.2. When i increase the parallelization of my >>>> application from 1 to 2, i see the following exceptions. what do they mean? >>>> how can i possibly fix this? >>>> >>>> java.lang.IllegalArgumentException: key group from 128 to 256 does not >>>> contain 89 >>>> at >>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >>>> at >>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >>>> at >>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >>>> at >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922) >>>> at >>>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:44) >>>> at >>>> org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30) >>>> at >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:936) >>>> at >>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) >>>> at >>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >>>> at >>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:784) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) >>>> at java.base/java.lang.Thread.run(Thread.java:829) >>>> >>>>
