hachikuji commented on a change in pull request #9418:
URL: https://github.com/apache/kafka/pull/9418#discussion_r509633527



##########
File path: core/src/main/scala/kafka/tools/TestRaftServer.scala
##########
@@ -272,7 +291,79 @@ class TestRaftServer(val config: KafkaConfig) extends 
Logging {
     )
   }
 
-  class RaftIoThread(client: KafkaRaftClient) extends 
ShutdownableThread("raft-io-thread") {
+  class RaftWorkloadGenerator(
+    client: KafkaRaftClient[Array[Byte]],
+    time: Time,
+    brokerId: Int,
+    recordsPerSec: Int,
+    recordSize: Int
+  ) extends ShutdownableThread(name = "raft-workload-generator") with 
RaftClient.Listener[Array[Byte]] {
+
+    private val stats = new WriteStats(time, printIntervalMs = 5000)
+    private val payload = new Array[Byte](recordSize)
+    private val pendingAppends = new util.ArrayDeque[PendingAppend]()
+
+    private var latestLeaderAndEpoch = new LeaderAndEpoch(OptionalInt.empty, 0)
+    private var isLeader = false
+    private var throttler: ThroughputThrottler = _
+    private var recordCount = 0
+
+    override def doWork(): Unit = {
+      if (latestLeaderAndEpoch != client.currentLeaderAndEpoch()) {
+        latestLeaderAndEpoch = client.currentLeaderAndEpoch()
+        isLeader = latestLeaderAndEpoch.leaderId.orElse(-1) == brokerId
+        if (isLeader) {
+          pendingAppends.clear()
+          throttler = new ThroughputThrottler(time, recordsPerSec)
+          recordCount = 0
+        }
+      }
+
+      if (isLeader) {
+        recordCount += 1
+
+        val startTimeMs = time.milliseconds()
+        val sendTimeMs = if (throttler.maybeThrottle(recordCount, 
startTimeMs)) {
+          time.milliseconds()
+        } else {
+          startTimeMs
+        }
+
+        val offset = client.scheduleAppend(latestLeaderAndEpoch.epoch, 
Collections.singletonList(payload))
+        if (offset == null || offset == Long.MaxValue) {
+          time.sleep(10)
+        } else {
+          pendingAppends.offer(PendingAppend(latestLeaderAndEpoch.epoch, 
offset, sendTimeMs))
+        }
+      } else {
+        time.sleep(500)
+      }
+    }
+
+    override def handleCommit(epoch: Int, lastOffset: Long, records: 
util.List[Array[Byte]]): Unit = {
+      var offset = lastOffset - records.size() + 1
+      val currentTimeMs = time.milliseconds()
+
+      for (record <- records.asScala) {
+        val pendingAppend = pendingAppends.poll()
+        if (pendingAppend.epoch != epoch || pendingAppend.offset!= offset) {
+          warn(s"Expected next commit at offset ${pendingAppend.offset}, " +

Review comment:
       That's true. In a follow-up, the `handleCommit` API will be expanded a 
bit to cover appends through replication as well, but for now, I think we can 
raise an error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to