[ https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17199858#comment-17199858 ]
Saad Rasool commented on KAFKA-10475: ------------------------------------- [~bchen225242] I will get back to you with sample data by the end of this week. > Using same key reports different count of records for groupBy() and > groupByKey() in Kafka Streaming Application > --------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-10475 > URL: https://issues.apache.org/jira/browse/KAFKA-10475 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0 > Environment: Kafka Cluster: > Kafka Version: kafka_2.12-2.6.0/ > openjdk version "1.8.0_265" > Kafka Streams: > Kafka Streams Version: 2.3.0 > openjdk version "11.0.8" > Reporter: Saad Rasool > Assignee: Divya Guduru > Priority: Major > > > We are experiencing what amounts to “lost packets” in our stream processing > when we use custom groupByKey() values. We have a single processor node, with > a source topic from which we read packets, do a grouping and aggregation on > that group, and output based on a computation that requires access to a > statestore. > > Let me give greater details of the problem and how we have tried to > understand it until now, below: > *Overview* We are setting up a Kafka Streams application in which we have to > perform windowed operations. We are grouping devices based on a specific key. > Following are the sample columns we are using for GroupBy: > > ||Field Name ||Field Value|| > |A|12| > |B|abc| > |C|x13| > > Sample Key based on the above data: 12abcx13 where key = Field (A) + Field > (B) + Field (C) > *Problem* Getting a different count of records in two scenarios against the > same key When specifying the key ourselves using groupBy() Using groupByKey() > to group the data on the ‘Input Kafka Topic’ partitioning key. > *Description* We were first using the groupBy() function of Kafka streams to > group the devices using the key above. In this case, the streams application > dropped several records and produced less number of records than expected. > However, when we did not specify our own custom grouping using the groupBy() > function, and instead used groupByKey() to key the data on the original > incoming Kafka partition key, we got the exact number of records which were > expected. > To check that we were using the exact same keys as the input topic for our > custom groupBy() function we compared both Keys within the code. The Input > topic key and the custom key were exactly the same. > So now we have come to the conclusion that there is some internal > functionality of the groupBy function that we are not able to understand > because of which the groupBy function and the groupByKey function both report > different counts for the same key. We have searched multiple forums but are > unable to understand the reason for this phenomenon. > *Code Snippet:* > With groupBykey() > > {code:java} > KStream<String, Output> myStream = this.stream > .groupByKey() > .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod))) > .reduce((value1, value2) -> value2) > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream() .transform(new myTransformer(this.store.name(), > this.store.name());{code} > > > With groupBy(): > > {code:java} > KStream<String, Output> myStream = this.stream > .groupBy((key, value) -> value.A + value.B + value.C, > Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) > .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod))) > .reduce((value1, value2) -> value2) > .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) > .toStream() .transform(new myTransformer(this.store.name()), > this.store.name());{code} > > > ||*Kafka Cluster Setup*|| || > |Number of Nodes| 3| > |CPU Cores| 2| > |RAM| 8 Gb| > > ||*Streaming Application Setup*||Version|| > | {{Kafka Streams Version }}| {{2.3.0}}| > | openjdk version| 11.0.8| -- This message was sent by Atlassian Jira (v8.3.4#803005)