[ 
https://issues.apache.org/jira/browse/KAFKA-12564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jessica Johann updated KAFKA-12564:
-----------------------------------
    Description: 
Libraries from build.sbt:

{{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}}

{{"org.apache.kafka" % "kafka-streams" % "2.7.0",}}

{{"org.apache.kafka" % "kafka-clients" % "2.7.0",}}

{{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}}
h4.  

Feed the Stream "issue_stream" with:

{{(1->"A")}}
 {{(1->"B")}}
h4.  

Topology:

{{// #1}}
 {{val issueStream:KStream[Int,String] = 
builder.stream[Int,String]("issue_stream")}}

 

{{// #2}}
 {{val aggTable:KTable[Int,String] =}}
 {{issueStream}}
 {{.groupBy((k,v)=>k)}}
 {{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}}

 

{{// #3}}
 {{aggTable}}
 {{.toStream}}
 {{.print(Printed.toSysOut)}}

 

{{// #4}}
 {{aggTable.filter((k,v)=> {}}
 {{  println(s"filter($k, $v) at ${System.nanoTime()}")}}
 {{  true}}
 {{})}}
 {{.toStream}}
 {{.print(Printed.toSysOut)}}
h4.  

First Tuple: (1->"A")

#3 Output as expected, the aggregated tuple ("EMPTY"+"+A")

{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}}

 

#4 The filter-method is called twice.
 First call with the expected tuple.

{{filter(1, EMPTY+A) at 211379567071847}}

The second call with the empty initialized aggregate.

{{filter(1, EMPTY) at 211379567120375}}

Output contains the correct tuple

{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}}
h4.  

Second Tuple: (1->"B")

#3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B")

{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}}

#4 Again a second unexpected call to filter(...) with the previous tuple before 
aggregation
 First call:

{{filter(1, EMPTY+A+B) at 211379567498482}}

Second call:

{{filter(1, EMPTY+A) at 211379567524475}}

But the output contains only one tuple as expected

{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}}

  was:
Libraries from build.sbt:

{{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}}

{{"org.apache.kafka" % "kafka-streams" % "2.7.0",}}

{{"org.apache.kafka" % "kafka-clients" % "2.7.0",}}

{{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}}
h4.  

Feed the Stream "issue_stream" with:

{{(1->"A")}}
 {{(1->"B")}}
h4.  

Topology:

{{// #1}}
 {{val issueStream:KStream[Int,String] = 
builder.stream[Int,String]("issue_stream")}}

 

{{// #2}}
{{val aggTable:KTable[Int,String] =}}
 {{issueStream}}
 {{.groupBy((k,v)=>k)}}
 {{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}}

 

{{// #3}}
{{aggTable}}
 {{.toStream}}
 {{.print(Printed.toSysOut)}}

 

{{// #4}}
{{aggTable.filter((k,v)=>}}}
{{  println(s"filter($k, $v) at ${System.nanoTime()}")}}
{{  true}}
 {{})}}
 {{.toStream}}
 {{.print(Printed.toSysOut)}}
h4.  

First Tuple: (1->"A")

#3 Output as expected, the aggregated tuple ("EMPTY"+"+A")

{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}}

 

#4 The filter-method is called twice.
 First call with the expected tuple.

{{filter(1, EMPTY+A) at 211379567071847}}

The second call with the empty initialized aggregate.

{{filter(1, EMPTY) at 211379567120375}}

Output contains the correct tuple

{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}}
h4.  

Second Tuple: (1->"B")

#3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B")

{{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}}

#4 Again a second unexpected call to filter(...) with the previous tuple before 
aggregation
 First call:

{{filter(1, EMPTY+A+B) at 211379567498482}}

Second call:

{{filter(1, EMPTY+A) at 211379567524475}}

But the output contains only one tuple as expected

{{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}}


> KTable#filter-method called twice after aggregation
> ---------------------------------------------------
>
>                 Key: KAFKA-12564
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12564
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.7.0
>            Reporter: Jessica Johann
>            Priority: Major
>
> Libraries from build.sbt:
> {{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-streams" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-clients" % "2.7.0",}}
> {{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}}
> h4.  
> Feed the Stream "issue_stream" with:
> {{(1->"A")}}
>  {{(1->"B")}}
> h4.  
> Topology:
> {{// #1}}
>  {{val issueStream:KStream[Int,String] = 
> builder.stream[Int,String]("issue_stream")}}
>  
> {{// #2}}
>  {{val aggTable:KTable[Int,String] =}}
>  {{issueStream}}
>  {{.groupBy((k,v)=>k)}}
>  {{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}}
>  
> {{// #3}}
>  {{aggTable}}
>  {{.toStream}}
>  {{.print(Printed.toSysOut)}}
>  
> {{// #4}}
>  {{aggTable.filter((k,v)=> {}}
>  {{  println(s"filter($k, $v) at ${System.nanoTime()}")}}
>  {{  true}}
>  {{})}}
>  {{.toStream}}
>  {{.print(Printed.toSysOut)}}
> h4.  
> First Tuple: (1->"A")
> #3 Output as expected, the aggregated tuple ("EMPTY"+"+A")
> {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}}
>  
> #4 The filter-method is called twice.
>  First call with the expected tuple.
> {{filter(1, EMPTY+A) at 211379567071847}}
> The second call with the empty initialized aggregate.
> {{filter(1, EMPTY) at 211379567120375}}
> Output contains the correct tuple
> {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}}
> h4.  
> Second Tuple: (1->"B")
> #3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B")
> {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}}
> #4 Again a second unexpected call to filter(...) with the previous tuple 
> before aggregation
>  First call:
> {{filter(1, EMPTY+A+B) at 211379567498482}}
> Second call:
> {{filter(1, EMPTY+A) at 211379567524475}}
> But the output contains only one tuple as expected
> {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to