[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617175#comment-16617175 ] Joan Goyeau commented on KAFKA-7316: The PR #5539 is now merged. There is no documentation change needed here since it's an internal change to fix the issue. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16616834#comment-16616834 ] Ted Yu commented on KAFKA-7316: --- Can this be resolved ? > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590581#comment-16590581 ] Matthias J. Sax commented on KAFKA-7316: Meta question: it seems we hit multiple issues with Scala API in 2.0.0 – to what extend do we need to update the docs for 2.0.1 and 2.1.0? It seems the current PRs don't include any docs updates. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590456#comment-16590456 ] Guozhang Wang commented on KAFKA-7316: -- The PR needs to be rebased --- you can follow its status on the PR itself. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590450#comment-16590450 ] Ted Yu commented on KAFKA-7316: --- When would PR #5539 be merged ? > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590446#comment-16590446 ] Guozhang Wang commented on KAFKA-7316: -- [~yuzhih...@gmail.com] the other PR has been merged, I will wait for resolving this ticket until the PR for {{peek}} is merged too. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Fix For: 2.0.1, 2.1.0 > > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590442#comment-16590442 ] ASF GitHub Bot commented on KAFKA-7316: --- guozhangwang closed pull request #5538: KAFKA-7316 Fix Streams Scala filter recursive call URL: https://github.com/apache/kafka/pull/5538 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index a78d321c941..d41496fb21c 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -47,7 +47,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filter` */ def filter(predicate: (K, V) => Boolean): KTable[K, V] = -inner.filter(predicate(_, _)) +inner.filter(predicate.asPredicate) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given @@ -71,7 +71,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filterNot` */ def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = -inner.filterNot(predicate(_, _)) +inner.filterNot(predicate.asPredicate) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which do not satisfy the given diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala index 6a302b207a9..2e2132d14eb 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala @@ -29,6 +29,52 @@ import org.scalatest.{FlatSpec, Matchers} @RunWith(classOf[JUnitRunner]) class KStreamTest extends FlatSpec with Matchers with TestDriver { + "filter a KStream" should "filter records satisfying the predicate" in { +val builder = new StreamsBuilder() +val sourceTopic = "source" +val sinkTopic = "sink" + +builder.stream[String, String](sourceTopic).filter((_, value) => value != "value2").to(sinkTopic) + +val testDriver = createTestDriver(builder) + +testDriver.pipeRecord(sourceTopic, ("1", "value1")) +testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1" + +testDriver.pipeRecord(sourceTopic, ("2", "value2")) +testDriver.readRecord[String, String](sinkTopic) shouldBe null + +testDriver.pipeRecord(sourceTopic, ("3", "value3")) +testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3" + +testDriver.readRecord[String, String](sinkTopic) shouldBe null + +testDriver.close() + } + + "filterNot a KStream" should "filter records not satisfying the predicate" in { +val builder = new StreamsBuilder() +val sourceTopic = "source" +val sinkTopic = "sink" + +builder.stream[String, String](sourceTopic).filterNot((_, value) => value == "value2").to(sinkTopic) + +val testDriver = createTestDriver(builder) + +testDriver.pipeRecord(sourceTopic, ("1", "value1")) +testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1" + +testDriver.pipeRecord(sourceTopic, ("2", "value2")) +testDriver.readRecord[String, String](sinkTopic) shouldBe null + +testDriver.pipeRecord(sourceTopic, ("3", "value3")) +testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3" + +testDriver.readRecord[String, String](sinkTopic) shouldBe null + +testDriver.close() + } + "selectKey a KStream" should "select a new key" in { val builder = new StreamsBuilder() val sourceTopic = "source" @@ -44,6 +90,8 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver { testDriver.pipeRecord(sourceTopic, ("1", "value2")) testDriver.readRecord[String, String](sinkTopic).key shouldBe "value2" +testDriver.readRecord[String, String](sinkTopic) shouldBe null + testDriver.close() } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala index 8c88ff5066f..2e9c821ed80 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala @@ -29,6 +29,72 @@
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589389#comment-16589389 ] Joan Goyeau commented on KAFKA-7316: This [https://github.com/apache/kafka/pull/5539] also fixes a similar issue on foreach. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587961#comment-16587961 ] Ted Yu commented on KAFKA-7316: --- I closed my PR since there was an earlier PR addressing the same problem: https://github.com/apache/kafka/pull/5538 I will handle peek method in another PR. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Attachments: 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587959#comment-16587959 ] ASF GitHub Bot commented on KAFKA-7316: --- tedyu closed pull request #5543: KAFKA-7316 Use of filter method in KTable.scala may result in StackOverflowError URL: https://github.com/apache/kafka/pull/5543 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala index 65ea4903326..ab0c5d2aebd 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala @@ -40,6 +40,12 @@ object FunctionConversions { } } + implicit class ForeachActionFromFunction[K, V](val fa: (K, V) => Unit) extends AnyVal { +def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] { + override def apply(key: K, value: V): Unit = fa(key, value) +} + } + implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends AnyVal { def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, VR] { override def apply(key: T, value: U): VR = f(key, value) diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index adc1850dc32..436a0c75d6a 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -173,7 +173,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KStream#foreach` */ def foreach(action: (K, V) => Unit): Unit = -inner.foreach((k: K, v: V) => action(k, v)) +inner.foreach(action.asForeachAction) /** * Creates an array of `KStream` from this stream by branching the records in the original stream based on @@ -575,5 +575,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KStream#peek` */ def peek(action: (K, V) => Unit): KStream[K, V] = -inner.peek((k: K, v: V) => action(k, v)) +inner.peek(action.asForeachAction) } diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala index b66977193e1..42e7d4054ce 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala @@ -46,7 +46,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filter` */ def filter(predicate: (K, V) => Boolean): KTable[K, V] = -inner.filter(predicate(_, _)) +inner.filter(predicate.asPredicate) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which satisfies the given @@ -70,7 +70,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) { * @see `org.apache.kafka.streams.kstream.KTable#filterNot` */ def filterNot(predicate: (K, V) => Boolean): KTable[K, V] = -inner.filterNot(predicate(_, _)) +inner.filterNot(predicate.asPredicate) /** * Create a new [[KTable]] that consists all records of this [[KTable]] which do not satisfy the given diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 3d1bab5d086..da5e154e96d 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -58,6 +58,12 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJ val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) +userRegionsTable.filter { (_, _) => + true +} +userRegionsTable.filterNot { (_, _) => + false +} // Compute the total per region by summing the
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586927#comment-16586927 ] ASF GitHub Bot commented on KAFKA-7316: --- tedyu opened a new pull request #5543: KAFKA-7316 Use of filter method in KTable.scala may result in StackOverflowError URL: https://github.com/apache/kafka/pull/5543 Due to lack of conversion to kstream Predicate, existing filter method in KTable.scala would result in StackOverflowError. This PR fixes the bug and adds calls in StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegion to prevent regression. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Attachments: 7316.v1.txt, 7316.v2.txt, 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586909#comment-16586909 ] Ted Yu commented on KAFKA-7316: --- Patch v4 makes the code compile. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Attachments: 7316.v1.txt, 7316.v2.txt, 7316.v4.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586897#comment-16586897 ] Ted Yu commented on KAFKA-7316: --- Patch v2 reduces compilation errors to 20. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Attachments: 7316.v1.txt, 7316.v2.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586841#comment-16586841 ] Ted Yu commented on KAFKA-7316: --- [~guozhang] [~mjsax] : Can you take a look at patch v1 to see if the changes to SessionWindowedKStream.scala are acceptable ? If so, I can work through the rest of compilation errors. Thanks > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > Attachments: 7316.v1.txt > > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError
[ https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586738#comment-16586738 ] Ted Yu commented on KAFKA-7316: --- I was thinking about removing the implicit wrapKTable . Then the following compilation errors pop up (only a snippet, there are more): {code} /Users/tyu/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala:52: type mismatch; found : org.apache.kafka.streams.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],VR] required: org.apache.kafka.streams.scala.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],VR] inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, merger.asMerger, materialized) ^ /Users/tyu/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala:64: type mismatch; found : org.apache.kafka.streams.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],Long] required: org.apache.kafka.streams.scala.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],Long] inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, ByteArraySessionStore]]) {code} If modifying the individual places is acceptable, I can send a PR. > Use of filter method in KTable.scala may result in StackOverflowError > - > > Key: KAFKA-7316 > URL: https://issues.apache.org/jira/browse/KAFKA-7316 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Ted Yu >Priority: Major > Labels: scala > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+ > Druhin reported seeing StackOverflowError when using filter method from > KTable.scala > This can be reproduced with the following change: > {code} > diff --git > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > b/streams/streams-scala/src/test/scala > index 3d1bab5..e0a06f2 100644 > --- > a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > +++ > b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes > extends StreamToTableJ > val userClicksStream: KStream[String, Long] = > builder.stream(userClicksTopic) > val userRegionsTable: KTable[String, String] = > builder.table(userRegionsTopic) > +userRegionsTable.filter { case (_, count) => true } > // Compute the total per region by summing the individual click counts > per region. > val clicksPerRegion: KTable[String, Long] = > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)