This is an automated email from the ASF dual-hosted git repository.

engelen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new 5f2fb8d  fix Kafka flaky test (#105)
5f2fb8d is described below

commit 5f2fb8d73e9548313fad8a3e9bffa291b8d84839
Author: AndyChen(Jingzhang) <[email protected]>
AuthorDate: Mon Apr 8 17:34:26 2024 +0800

    fix Kafka flaky test (#105)
    
    * fix Kafka flaky test
    
    * assert on handle function
    
    * catch eager element
---
 .../kafka/internal/KafkaSourceProviderImplSpec.scala        | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
 
b/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
index cc84d3f..9552229 100644
--- 
a/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
+++ 
b/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
@@ -70,9 +70,10 @@ class KafkaSourceProviderImplSpec extends 
ScalaTestWithActorTestKit with LogCapt
       val metadataClient = new TestMetadataClientAdapter(partitions)
       val tp0 = new TopicPartition(topic, 0)
       val tp1 = new TopicPartition(topic, 1)
+      val totalPerPartition = 10
 
       val consumerRecords =
-        for (n <- 0 to 10; tp <- List(tp0, tp1))
+        for (n <- 0 to totalPerPartition; tp <- List(tp0, tp1))
           yield new ConsumerRecord(tp.topic(), tp.partition(), n, n.toString, 
n.toString)
 
       val consumerSource = Source(consumerRecords)
@@ -109,6 +110,12 @@ class KafkaSourceProviderImplSpec extends 
ScalaTestWithActorTestKit with LogCapt
           records.count(_.partition() == tp1.partition()) shouldBe 5
         }
 
+        // because source push to handle(probe) before sinkProbe request pull, 
it made probe cache random one record
+        val eagerMessage = probe.receiveMessage()
+        records = records ++ Set(eagerMessage)
+        val tp0Received = records.count(_.partition() == tp0.partition())
+        val tp0Expect = totalPerPartition - tp0Received
+
         // assign only tp0 to this projection
         provider.partitionHandler.onAssign(Set(tp0), null)
         provider.partitionHandler.onRevoke(Set(tp1), null)
@@ -120,10 +127,10 @@ class KafkaSourceProviderImplSpec extends 
ScalaTestWithActorTestKit with LogCapt
         // only records from partition 0 should remain, because the rest were 
filtered
         sinkProbe.request(5)
         sinkProbe.expectNextN(5)
-        records = probe.receiveMessages(5)
+        records = probe.receiveMessages(tp0Expect)
 
         withClue("checking: after rebalance processed records should only have 
records from partition 0") {
-          records.count(_.partition() == tp0.partition()) shouldBe 5
+          records.count(_.partition() == tp0.partition()) shouldBe tp0Expect
           records.count(_.partition() == tp1.partition()) shouldBe 0
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to