[ 
https://issues.apache.org/jira/browse/KAFKA-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716720#comment-16716720
 ] 

ASF GitHub Bot commented on KAFKA-7708:
---------------------------------------

lodamar closed pull request #6019: KAFKA-7708: Fixed KTable tests using KStream 
API in scala tests
URL: https://github.com/apache/kafka/pull/6019
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
index dc080f13310..0ef50e383c9 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
@@ -34,26 +34,32 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
     val sourceTopic = "source"
     val sinkTopic = "sink"
 
-    val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
-    table.filter((_, value) => value > 1).toStream.to(sinkTopic)
+    val table = builder.table[String, String](sourceTopic)
+    table.mapValues(_.length).filter((_, value) => value > 
5).toStream.to(sinkTopic)
 
     val testDriver = createTestDriver(builder)
 
     {
-      testDriver.pipeRecord(sourceTopic, ("1", "value1"))
-      val record = testDriver.readRecord[String, Long](sinkTopic)
+      testDriver.pipeRecord(sourceTopic, ("1", "firstvalue"))
+      val record = testDriver.readRecord[String, Int](sinkTopic)
       record.key shouldBe "1"
-      record.value shouldBe (null: java.lang.Long)
+      record.value shouldBe 10
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("1", "secondvalue"))
+      val record = testDriver.readRecord[String, Int](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe 11
     }
     {
-      testDriver.pipeRecord(sourceTopic, ("1", "value2"))
-      val record = testDriver.readRecord[String, Long](sinkTopic)
+      testDriver.pipeRecord(sourceTopic, ("1", "short"))
+      val record = testDriver.readRecord[String, Int](sinkTopic)
       record.key shouldBe "1"
-      record.value shouldBe 2
+      record.value shouldBe (null: java.lang.Long)
     }
     {
-      testDriver.pipeRecord(sourceTopic, ("2", "value1"))
-      val record = testDriver.readRecord[String, Long](sinkTopic)
+      testDriver.pipeRecord(sourceTopic, ("2", "val3"))
+      val record = testDriver.readRecord[String, Int](sinkTopic)
       record.key shouldBe "2"
       record.value shouldBe (null: java.lang.Long)
     }
@@ -67,30 +73,36 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
     val sourceTopic = "source"
     val sinkTopic = "sink"
 
-    val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
-    table.filterNot((_, value) => value > 1).toStream.to(sinkTopic)
+    val table = builder.table[String, String](sourceTopic)
+    table.filterNot((_, value) => 
value.exists(_.isUpper)).toStream.to(sinkTopic)
 
     val testDriver = createTestDriver(builder)
 
     {
-      testDriver.pipeRecord(sourceTopic, ("1", "value1"))
-      val record = testDriver.readRecord[String, Long](sinkTopic)
+      testDriver.pipeRecord(sourceTopic, ("1", "FirstValue"))
+      val record = testDriver.readRecord[String, String](sinkTopic)
       record.key shouldBe "1"
-      record.value shouldBe 1
+      record.value shouldBe (null: java.lang.String)
     }
     {
-      testDriver.pipeRecord(sourceTopic, ("1", "value2"))
-      val record = testDriver.readRecord[String, Long](sinkTopic)
+      testDriver.pipeRecord(sourceTopic, ("1", "secondvalue"))
+      val record = testDriver.readRecord[String, String](sinkTopic)
       record.key shouldBe "1"
-      record.value shouldBe (null: java.lang.Long)
+      record.value shouldBe "secondvalue"
     }
     {
-      testDriver.pipeRecord(sourceTopic, ("2", "value1"))
-      val record = testDriver.readRecord[String, Long](sinkTopic)
+      testDriver.pipeRecord(sourceTopic, ("1", "Short"))
+      val record = testDriver.readRecord[String, String](sinkTopic)
+      record.key shouldBe "1"
+      record.value shouldBe (null: java.lang.String)
+    }
+    {
+      testDriver.pipeRecord(sourceTopic, ("2", "val"))
+      val record = testDriver.readRecord[String, String](sinkTopic)
       record.key shouldBe "2"
-      record.value shouldBe 1
+      record.value shouldBe "val"
     }
-    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+    testDriver.readRecord[String, String](sinkTopic) shouldBe null
 
     testDriver.close()
   }
@@ -101,17 +113,17 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
     val sourceTopic2 = "source2"
     val sinkTopic = "sink"
 
-    val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) 
=> key).count()
-    val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) 
=> key).count()
+    val table1 = builder.table[String, Int](sourceTopic1)
+    val table2 = builder.table[String, Int](sourceTopic2)
     table1.join(table2)((a, b) => a + b).toStream.to(sinkTopic)
 
     val testDriver = createTestDriver(builder)
 
-    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
-    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
-    testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
+    testDriver.pipeRecord(sourceTopic1, ("1", 3))
+    testDriver.pipeRecord(sourceTopic2, ("1", 2))
+    testDriver.readRecord[String, Int](sinkTopic).value shouldBe 5
 
-    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+    testDriver.readRecord[String, Int](sinkTopic) shouldBe null
 
     testDriver.close()
   }
@@ -122,20 +134,20 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
     val sourceTopic2 = "source2"
     val sinkTopic = "sink"
     val stateStore = "store"
-    val materialized = Materialized.as[String, Long, 
ByteArrayKeyValueStore](stateStore)
+    val materialized = Materialized.as[String, Int, 
ByteArrayKeyValueStore](stateStore)
 
-    val table1 = builder.stream[String, String](sourceTopic1).groupBy((key, _) 
=> key).count()
-    val table2 = builder.stream[String, String](sourceTopic2).groupBy((key, _) 
=> key).count()
+    val table1 = builder.table[String, Int](sourceTopic1)
+    val table2 = builder.table[String, Int](sourceTopic2)
     table1.join(table2, materialized)((a, b) => a + b).toStream.to(sinkTopic)
 
     val testDriver = createTestDriver(builder)
 
-    testDriver.pipeRecord(sourceTopic1, ("1", "topic1value1"))
-    testDriver.pipeRecord(sourceTopic2, ("1", "topic2value1"))
-    testDriver.readRecord[String, Long](sinkTopic).value shouldBe 2
-    testDriver.getKeyValueStore[String, Long](stateStore).get("1") shouldBe 2
+    testDriver.pipeRecord(sourceTopic1, ("1", 1))
+    testDriver.pipeRecord(sourceTopic2, ("1", 3))
+    testDriver.readRecord[String, Int](sinkTopic).value shouldBe 4
+    testDriver.getKeyValueStore[String, Int](stateStore).get("1") shouldBe 4
 
-    testDriver.readRecord[String, Long](sinkTopic) shouldBe null
+    testDriver.readRecord[String, Int](sinkTopic) shouldBe null
 
     testDriver.close()
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> [kafka-streams-scala] Invalid signature for KTable join in 2.12
> ---------------------------------------------------------------
>
>                 Key: KAFKA-7708
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7708
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Edmondo Porcu
>            Priority: Major
>              Labels: scala
>
> The signature in Scala 2.12 for the join in the 
> org.kafka.streams.scala.streams.KTable cannot be resolved by the compiler, 
> probably due to the way parameters lists are handled by the compiler .
> See:
>  
> [https://github.com/scala/bug/issues/11288]
> [https://stackoverflow.com/questions/53615950/scalac-2-12-fails-to-resolve-overloaded-methods-with-multiple-argument-lists-whe]
>  
> We are wondering how this is not captured by the current build of Kafka, we 
> are building on 2.12.7 as well



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to