[ 
https://issues.apache.org/jira/browse/KAFKA-10475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17199858#comment-17199858
 ] 

Saad Rasool commented on KAFKA-10475:
-------------------------------------

[~bchen225242] I will get back to you with sample data by the end of this week.

> Using same key reports different count of records for groupBy() and 
> groupByKey() in Kafka Streaming Application
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10475
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10475
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>         Environment: Kafka Cluster:
> Kafka Version: kafka_2.12-2.6.0/
> openjdk version "1.8.0_265"
> Kafka Streams:
> Kafka Streams Version: 2.3.0
> openjdk version "11.0.8"
>            Reporter: Saad Rasool
>            Assignee: Divya Guduru
>            Priority: Major
>
>  
> We are experiencing what amounts to “lost packets” in our stream processing 
> when we use custom groupByKey() values. We have a single processor node, with 
> a source topic from which we read packets, do a grouping and aggregation on 
> that group, and output based on a computation that requires access to a 
> statestore.
>  
> Let me give greater details of the problem and how we have tried to 
> understand it until now, below:
> *Overview* We are setting up a Kafka Streams application in which we have to 
> perform windowed operations. We are grouping devices based on a specific key. 
> Following are the sample columns we are using for GroupBy:
>  
> ||Field Name ||Field Value||
> |A|12|
> |B|abc|
> |C|x13|
>  
> Sample Key based on the above data: 12abcx13 where key = Field (A) + Field 
> (B) + Field (C)
> *Problem* Getting a different count of records in two scenarios against the 
> same key When specifying the key ourselves using groupBy() Using groupByKey() 
> to group the data on the ‘Input Kafka Topic’ partitioning key.
> *Description* We were first using the groupBy() function of Kafka streams to 
> group the devices using the key above. In this case, the streams application 
> dropped several records and produced less number of records than expected. 
> However, when we did not specify our own custom grouping using the groupBy() 
> function, and instead used groupByKey() to key the data on the original 
> incoming Kafka partition key, we got the exact number of records which were 
> expected.
> To check that we were using the exact same keys as the input topic for our 
> custom groupBy() function we compared both Keys within the code. The Input 
> topic key and the custom key were exactly the same.
> So now we have come to the conclusion that there is some internal 
> functionality of the groupBy function that we are not able to understand 
> because of which the groupBy function and the groupByKey function both report 
> different counts for the same key. We have searched multiple forums but are 
> unable to understand the reason for this phenomenon.
> *Code Snippet:*
> With groupBykey()
>   
> {code:java}
> KStream<String, Output> myStream = this.stream
> .groupByKey() 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name(), 
> this.store.name());{code}
>  
>   
> With groupBy():
>   
> {code:java}
> KStream<String, Output> myStream = this.stream
> .groupBy((key, value) -> value.A + value.B + value.C, 
> Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes())) 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
>  .reduce((value1, value2) -> value2) 
> .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) 
> .toStream() .transform(new myTransformer(this.store.name()), 
> this.store.name());{code}
>  
>   
> ||*Kafka Cluster Setup*|| ||
> |Number of Nodes|       3|
> |CPU Cores|       2|
> |RAM|     8 Gb|
>  
> ||*Streaming Application Setup*||Version||
> |       {{Kafka Streams Version }}| {{2.3.0}}|
> |          openjdk version| 11.0.8|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to