[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587959#comment-16587959
 ] 

ASF GitHub Bot commented on KAFKA-7316:
---------------------------------------

tedyu closed pull request #5543: KAFKA-7316 Use of filter method in 
KTable.scala may result in StackOverflowError
URL: https://github.com/apache/kafka/pull/5543
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 65ea4903326..ab0c5d2aebd 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -40,6 +40,12 @@ object FunctionConversions {
     }
   }
 
+  implicit class ForeachActionFromFunction[K, V](val fa: (K, V) => Unit) 
extends AnyVal {
+    def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] {
+      override def apply(key: K, value: V): Unit = fa(key, value)
+    }
+  }
+
   implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends 
AnyVal {
     def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, 
VR] {
       override def apply(key: T, value: U): VR = f(key, value)
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index adc1850dc32..436a0c75d6a 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -173,7 +173,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#foreach`
    */
   def foreach(action: (K, V) => Unit): Unit =
-    inner.foreach((k: K, v: V) => action(k, v))
+    inner.foreach(action.asForeachAction)
 
   /**
    * Creates an array of `KStream` from this stream by branching the records 
in the original stream based on
@@ -575,5 +575,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KStream#peek`
    */
   def peek(action: (K, V) => Unit): KStream[K, V] =
-    inner.peek((k: K, v: V) => action(k, v))
+    inner.peek(action.asForeachAction)
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index b66977193e1..42e7d4054ce 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -46,7 +46,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filter`
    */
   def filter(predicate: (K, V) => Boolean): KTable[K, V] =
-    inner.filter(predicate(_, _))
+    inner.filter(predicate.asPredicate)
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] 
which satisfies the given
@@ -70,7 +70,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    * @see `org.apache.kafka.streams.kstream.KTable#filterNot`
    */
   def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
-    inner.filterNot(predicate(_, _))
+    inner.filterNot(predicate.asPredicate)
 
   /**
    * Create a new [[KTable]] that consists all records of this [[KTable]] 
which do <em>not</em> satisfy the given
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 3d1bab5d086..da5e154e96d 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -58,6 +58,12 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends StreamToTableJ
     val userClicksStream: KStream[String, Long] = 
builder.stream(userClicksTopic)
 
     val userRegionsTable: KTable[String, String] = 
builder.table(userRegionsTopic)
+    userRegionsTable.filter { (_, _) =>
+      true
+    }
+    userRegionsTable.filterNot { (_, _) =>
+      false
+    }
 
     // Compute the total per region by summing the individual click counts per 
region.
     val clicksPerRegion: KTable[String, Long] =


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use of filter method in KTable.scala may result in StackOverflowError
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7316
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7316
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Ted Yu
>            Priority: Major
>              Labels: scala
>         Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>      val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>      val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +    userRegionsTable.filter { case (_, count) => true }
>      // Compute the total per region by summing the individual click counts 
> per region.
>      val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to