[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590442#comment-16590442
 ] 

ASF GitHub Bot commented on KAFKA-7316:
---------------------------------------

guozhangwang closed pull request #5538: KAFKA-7316 Fix Streams Scala filter 
recursive call
URL: https://github.com/apache/kafka/pull/5538
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index a78d321c941..d41496fb21c 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -47,7 +47,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filter`
    */
   def filter(predicate: (K, V) => Boolean): KTable[K, V] =
-    inner.filter(predicate(_, _))
+    inner.filter(predicate.asPredicate)
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] 
which satisfies the given
@@ -71,7 +71,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
    */
   def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
-    inner.filterNot(predicate(_, _))
+    inner.filterNot(predicate.asPredicate)
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] 
which do <em>not</em> satisfy the given
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index 6a302b207a9..2e2132d14eb 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -29,6 +29,52 @@ import org.scalatest.{FlatSpec, Matchers}
 @RunWith(classOf[JUnitRunner])
 class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
+  "filter a KStream" should "filter records satisfying the predicate" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    builder.stream[String, String](sourceTopic).filter((_, value) => value != 
"value2").to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "filterNot a KStream" should "filter records not satisfying the predicate" 
in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    builder.stream[String, String](sourceTopic).filterNot((_, value) => value 
== "value2").to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
   "selectKey a KStream" should "select a new key" in {
     val builder = new StreamsBuilder()
     val sourceTopic = "source"
@@ -44,6 +90,8 @@ class KStreamTest extends FlatSpec with Matchers with 
TestDriver {
     testDriver.pipeRecord(sourceTopic, ("1", "value2"))
     testDriver.readRecord[String, String](sinkTopic).key shouldBe "value2"
 
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
     testDriver.close()
   }
 
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
index 8c88ff5066f..2e9c821ed80 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
@@ -29,6 +29,72 @@ import org.scalatest.{FlatSpec, Matchers}
 @RunWith(classOf[JUnitRunner])
 class KTableTest extends FlatSpec with Matchers with TestDriver {
 
+  "filter a KTable" should "filter records satisfying the predicate" in {
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
+    table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe (null: java.lang.Long)
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe 2
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("2", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "2"
+      record.value shouldBe (null: java.lang.Long)
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
+  "filterNot a KTable" should "filter records not satisfying the predicate" in 
{
+    val builder = new StreamsBuilder()
+    val sourceTopic = "source"
+    val sinkTopic = "sink"
+
+    val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
+    table.filterNot((_, value) => value > 1).toStream.to(sinkTopic)
+
+    val testDriver = createTestDriver(builder)
+
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe 1
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "value2"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe (null: java.lang.Long)
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("2", "value1"))
+      val record = testDriver.readRecord[String, Long](sinkTopic)
+      record.key shouldBe "2"
+      record.value shouldBe 1
+    }
+    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+
+    testDriver.close()
+  }
+
   "join 2 KTables" should "join correctly records" in {
     val builder = new StreamsBuilder()
     val sourceTopic1 = "source1"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use of filter method in KTable.scala may result in StackOverflowError
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7316
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7316
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Ted Yu
>            Priority: Major
>              Labels: scala
>         Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>      val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>      val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +    userRegionsTable.filter { case (_, count) => true }
>      // Compute the total per region by summing the individual click counts 
> per region.
>      val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to