[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-09-17 Thread Joan Goyeau (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617175#comment-16617175
 ] 

Joan Goyeau commented on KAFKA-7316:


The PR #5539 is now merged.

There is no documentation change needed here since it's an internal change to 
fix the issue.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-09-16 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16616834#comment-16616834
 ] 

Ted Yu commented on KAFKA-7316:
---

Can this be resolved ?

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-23 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590581#comment-16590581
 ] 

Matthias J. Sax commented on KAFKA-7316:


Meta question: it seems we hit multiple issues with Scala API in 2.0.0 – to 
what extend do we need to update the docs for 2.0.1 and 2.1.0? It seems the 
current PRs don't include any docs updates.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-23 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590456#comment-16590456
 ] 

Guozhang Wang commented on KAFKA-7316:
--

The PR needs to be rebased --- you can follow its status on the PR itself.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-23 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590450#comment-16590450
 ] 

Ted Yu commented on KAFKA-7316:
---

When would PR #5539 be merged ?

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-23 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590446#comment-16590446
 ] 

Guozhang Wang commented on KAFKA-7316:
--

[~yuzhih...@gmail.com] the other PR has been merged, I will wait for resolving 
this ticket until the PR for {{peek}} is merged too.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Fix For: 2.0.1, 2.1.0
>
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590442#comment-16590442
 ] 

ASF GitHub Bot commented on KAFKA-7316:
---

guozhangwang closed pull request #5538: KAFKA-7316 Fix Streams Scala filter 
recursive call
URL: https://github.com/apache/kafka/pull/5538
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index a78d321c941..d41496fb21c 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -47,7 +47,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
   def filter(predicate: (K, V) => Boolean): KTable[K, V] =
-inner.filter(predicate(_, _))
+inner.filter(predicate.asPredicate)
 
   /**
* Create a new [[KTable]] that consists all records of this [[KTable]] 
which satisfies the given
@@ -71,7 +71,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
   def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
-inner.filterNot(predicate(_, _))
+inner.filterNot(predicate.asPredicate)
 
   /**
* Create a new [[KTable]] that consists all records of this [[KTable]] 
which do not satisfy the given
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
index 6a302b207a9..2e2132d14eb 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
@@ -29,6 +29,52 @@ import org.scalatest.{FlatSpec, Matchers}
 @RunWith(classOf[JUnitRunner])
 class KStreamTest extends FlatSpec with Matchers with TestDriver {
 
+  "filter a KStream" should "filter records satisfying the predicate" in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+builder.stream[String, String](sourceTopic).filter((_, value) => value != 
"value2").to(sinkTopic)
+
+val testDriver = createTestDriver(builder)
+
+testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+testDriver.close()
+  }
+
+  "filterNot a KStream" should "filter records not satisfying the predicate" 
in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+builder.stream[String, String](sourceTopic).filterNot((_, value) => value 
== "value2").to(sinkTopic)
+
+val testDriver = createTestDriver(builder)
+
+testDriver.pipeRecord(sourceTopic, ("1", "value1"))
+testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
+
+testDriver.pipeRecord(sourceTopic, ("2", "value2"))
+testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+testDriver.pipeRecord(sourceTopic, ("3", "value3"))
+testDriver.readRecord[String, String](sinkTopic).value shouldBe "value3"
+
+testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
+testDriver.close()
+  }
+
   "selectKey a KStream" should "select a new key" in {
 val builder = new StreamsBuilder()
 val sourceTopic = "source"
@@ -44,6 +90,8 @@ class KStreamTest extends FlatSpec with Matchers with 
TestDriver {
 testDriver.pipeRecord(sourceTopic, ("1", "value2"))
 testDriver.readRecord[String, String](sinkTopic).key shouldBe "value2"
 
+testDriver.readRecord[String, String](sinkTopic) shouldBe null
+
 testDriver.close()
   }
 
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
index 8c88ff5066f..2e9c821ed80 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
@@ -29,6 +29,72 @@ 

[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-22 Thread Joan Goyeau (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16589389#comment-16589389
 ] 

Joan Goyeau commented on KAFKA-7316:


This [https://github.com/apache/kafka/pull/5539] also fixes a similar issue on 
foreach.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587961#comment-16587961
 ] 

Ted Yu commented on KAFKA-7316:
---

I closed my PR since there was an earlier PR addressing the same problem:

https://github.com/apache/kafka/pull/5538

I will handle peek method in another PR.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16587959#comment-16587959
 ] 

ASF GitHub Bot commented on KAFKA-7316:
---

tedyu closed pull request #5543: KAFKA-7316 Use of filter method in 
KTable.scala may result in StackOverflowError
URL: https://github.com/apache/kafka/pull/5543
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
index 65ea4903326..ab0c5d2aebd 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/FunctionConversions.scala
@@ -40,6 +40,12 @@ object FunctionConversions {
 }
   }
 
+  implicit class ForeachActionFromFunction[K, V](val fa: (K, V) => Unit) 
extends AnyVal {
+def asForeachAction: ForeachAction[K, V] = new ForeachAction[K, V] {
+  override def apply(key: K, value: V): Unit = fa(key, value)
+}
+  }
+
   implicit class MapperFromFunction[T, U, VR](val f: (T, U) => VR) extends 
AnyVal {
 def asKeyValueMapper: KeyValueMapper[T, U, VR] = new KeyValueMapper[T, U, 
VR] {
   override def apply(key: T, value: U): VR = f(key, value)
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index adc1850dc32..436a0c75d6a 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -173,7 +173,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#foreach`
*/
   def foreach(action: (K, V) => Unit): Unit =
-inner.foreach((k: K, v: V) => action(k, v))
+inner.foreach(action.asForeachAction)
 
   /**
* Creates an array of `KStream` from this stream by branching the records 
in the original stream based on
@@ -575,5 +575,5 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KStream#peek`
*/
   def peek(action: (K, V) => Unit): KStream[K, V] =
-inner.peek((k: K, v: V) => action(k, v))
+inner.peek(action.asForeachAction)
 }
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index b66977193e1..42e7d4054ce 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -46,7 +46,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filter`
*/
   def filter(predicate: (K, V) => Boolean): KTable[K, V] =
-inner.filter(predicate(_, _))
+inner.filter(predicate.asPredicate)
 
   /**
* Create a new [[KTable]] that consists all records of this [[KTable]] 
which satisfies the given
@@ -70,7 +70,7 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
* @see `org.apache.kafka.streams.kstream.KTable#filterNot`
*/
   def filterNot(predicate: (K, V) => Boolean): KTable[K, V] =
-inner.filterNot(predicate(_, _))
+inner.filterNot(predicate.asPredicate)
 
   /**
* Create a new [[KTable]] that consists all records of this [[KTable]] 
which do not satisfy the given
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
index 3d1bab5d086..da5e154e96d 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
@@ -58,6 +58,12 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
extends StreamToTableJ
 val userClicksStream: KStream[String, Long] = 
builder.stream(userClicksTopic)
 
 val userRegionsTable: KTable[String, String] = 
builder.table(userRegionsTopic)
+userRegionsTable.filter { (_, _) =>
+  true
+}
+userRegionsTable.filterNot { (_, _) =>
+  false
+}
 
 // Compute the total per region by summing the 

[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586927#comment-16586927
 ] 

ASF GitHub Bot commented on KAFKA-7316:
---

tedyu opened a new pull request #5543: KAFKA-7316 Use of filter method in 
KTable.scala may result in StackOverflowError
URL: https://github.com/apache/kafka/pull/5543
 
 
   Due to lack of conversion to kstream Predicate, existing filter method in 
KTable.scala would result in StackOverflowError.
   
   This PR fixes the bug and adds calls in 
StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegion
 to prevent regression.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v1.txt, 7316.v2.txt, 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586909#comment-16586909
 ] 

Ted Yu commented on KAFKA-7316:
---

Patch v4 makes the code compile.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v1.txt, 7316.v2.txt, 7316.v4.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586897#comment-16586897
 ] 

Ted Yu commented on KAFKA-7316:
---

Patch v2 reduces compilation errors to 20.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v1.txt, 7316.v2.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586841#comment-16586841
 ] 

Ted Yu commented on KAFKA-7316:
---

[~guozhang] [~mjsax] :
Can you take a look at patch v1 to see if the changes to 
SessionWindowedKStream.scala are acceptable ?

If so, I can work through the rest of compilation errors.

Thanks

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
> Attachments: 7316.v1.txt
>
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7316) Use of filter method in KTable.scala may result in StackOverflowError

2018-08-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586738#comment-16586738
 ] 

Ted Yu commented on KAFKA-7316:
---

I was thinking about removing the implicit wrapKTable .
Then the following compilation errors pop up (only a snippet, there are more):
{code}
/Users/tyu/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala:52:
 type mismatch;
 found   : 
org.apache.kafka.streams.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],VR]
 required: 
org.apache.kafka.streams.scala.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],VR]
inner.aggregate((() => initializer).asInitializer, aggregator.asAggregator, 
merger.asMerger, materialized)
   ^
/Users/tyu/kafka/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/SessionWindowedKStream.scala:64:
 type mismatch;
 found   : 
org.apache.kafka.streams.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],Long]
 required: 
org.apache.kafka.streams.scala.kstream.KTable[org.apache.kafka.streams.kstream.Windowed[K],Long]
  inner.count(materialized.asInstanceOf[Materialized[K, java.lang.Long, 
ByteArraySessionStore]])
{code}
If modifying the individual places is acceptable, I can send a PR.

> Use of filter method in KTable.scala may result in StackOverflowError
> -
>
> Key: KAFKA-7316
> URL: https://issues.apache.org/jira/browse/KAFKA-7316
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ted Yu
>Priority: Major
>  Labels: scala
>
> In this thread:
> http://search-hadoop.com/m/Kafka/uyzND1dNbYKXzC4F1?subj=Issue+in+Kafka+2+0+0+
> Druhin reported seeing StackOverflowError when using filter method from 
> KTable.scala
> This can be reproduced with the following change:
> {code}
> diff --git 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
>  b/streams/streams-scala/src/test/scala
> index 3d1bab5..e0a06f2 100644
> --- 
> a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> +++ 
> b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala
> @@ -58,6 +58,7 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes 
> extends StreamToTableJ
>  val userClicksStream: KStream[String, Long] = 
> builder.stream(userClicksTopic)
>  val userRegionsTable: KTable[String, String] = 
> builder.table(userRegionsTopic)
> +userRegionsTable.filter { case (_, count) => true }
>  // Compute the total per region by summing the individual click counts 
> per region.
>  val clicksPerRegion: KTable[String, Long] =
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)