This is an automated email from the ASF dual-hosted git repository.
engelen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new 5f2fb8d fix Kafka flaky test (#105)
5f2fb8d is described below
commit 5f2fb8d73e9548313fad8a3e9bffa291b8d84839
Author: AndyChen(Jingzhang) <[email protected]>
AuthorDate: Mon Apr 8 17:34:26 2024 +0800
fix Kafka flaky test (#105)
* fix Kafka flaky test
* assert on handle function
* catch eager element
---
.../kafka/internal/KafkaSourceProviderImplSpec.scala | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git
a/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
b/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
index cc84d3f..9552229 100644
---
a/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
+++
b/kafka-test/src/test/scala/org/apache/pekko/projection/kafka/internal/KafkaSourceProviderImplSpec.scala
@@ -70,9 +70,10 @@ class KafkaSourceProviderImplSpec extends
ScalaTestWithActorTestKit with LogCapt
val metadataClient = new TestMetadataClientAdapter(partitions)
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
+ val totalPerPartition = 10
val consumerRecords =
- for (n <- 0 to 10; tp <- List(tp0, tp1))
+ for (n <- 0 to totalPerPartition; tp <- List(tp0, tp1))
yield new ConsumerRecord(tp.topic(), tp.partition(), n, n.toString,
n.toString)
val consumerSource = Source(consumerRecords)
@@ -109,6 +110,12 @@ class KafkaSourceProviderImplSpec extends
ScalaTestWithActorTestKit with LogCapt
records.count(_.partition() == tp1.partition()) shouldBe 5
}
+ // because source push to handle(probe) before sinkProbe request pull,
it made probe cache random one record
+ val eagerMessage = probe.receiveMessage()
+ records = records ++ Set(eagerMessage)
+ val tp0Received = records.count(_.partition() == tp0.partition())
+ val tp0Expect = totalPerPartition - tp0Received
+
// assign only tp0 to this projection
provider.partitionHandler.onAssign(Set(tp0), null)
provider.partitionHandler.onRevoke(Set(tp1), null)
@@ -120,10 +127,10 @@ class KafkaSourceProviderImplSpec extends
ScalaTestWithActorTestKit with LogCapt
// only records from partition 0 should remain, because the rest were
filtered
sinkProbe.request(5)
sinkProbe.expectNextN(5)
- records = probe.receiveMessages(5)
+ records = probe.receiveMessages(tp0Expect)
withClue("checking: after rebalance processed records should only have
records from partition 0") {
- records.count(_.partition() == tp0.partition()) shouldBe 5
+ records.count(_.partition() == tp0.partition()) shouldBe tp0Expect
records.count(_.partition() == tp1.partition()) shouldBe 0
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]