[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587959#comment-16587959 ]
ASF GitHub Bot commented on KAFKA-7316: --------------------------------------- tedyu closed pull request #5543: KAFKA-7316 Use of filter method in KTable.scala may result in StackOverflowError URL: https://github.com/apache/kafka/pull/5543 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala index 65ea4903326..ab0c5d2aebd 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala @@ -40,6 +40,12 @@ object FunctionConversions { } } + implicit class ForeachActionFromFunction[K, V](val fa: (K, V) => Unit) extends AnyVal { + def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] { + override def apply(key: K, value: V): Unit = fa(key, value) + } + } + implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal { def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] { override def apply(key: T, value: U): VR = f(key, value) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index adc1850dc32..436a0c75d6a 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -173,7 +173,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KStream#foreach` */ def foreach(action: (K, V) => Unit): Unit = - inner.foreach((k: K, v: V) => action(k, v)) + inner.foreach(action.asForeachAction) /** * Creates an array of `KStream` from this stream by branching the records in the original stream based on @@ -575,5 +575,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KStream#peek` */ def peek(action: (K, V) => Unit): KStream[K, V] = - inner.peek((k: K, v: V) => action(k, v)) + inner.peek(action.asForeachAction) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index b66977193e1..42e7d4054ce 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -46,7 +46,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filter` */ def filter(predicate: (K, V) => Boolean): KTable[K, V] = - inner.filter(predicate(_, _)) + inner.filter(predicate.asPredicate) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given @@ -70,7 +70,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filterNot` */ def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = - inner.filterNot(predicate(_, _)) + inner.filterNot(predicate.asPredicate) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which do <em>not</em> satisfy the given diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 3d1bab5d086..da5e154e96d 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -58,6 +58,12 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) + userRegionsTable.filter { (_, _) => + true + } + userRegionsTable.filterNot { (_, _) => + false + } // Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTable[String, Long] = ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use of filter method in KTable.scala may result in StackOverflowError > --------------------------------------------------------------------- > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Ted Yu > Priority: Major > Labels: scala > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > + userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)