[
https://issues.apache.org/jira/browse/KAFKA-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317394#comment-16317394
]
ASF GitHub Bot commented on KAFKA-6096:
---
hachikuji closed pull request #4122: KAFKA-6096: Add multi-threaded tests for
group coordinator, txn manager
URL: https://github.com/apache/kafka/pull/4122
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
new file mode 100644
index 000..0ecc3f538b1
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util.{ Collections, Random }
+import java.util.concurrent.{ ConcurrentHashMap, Executors }
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.Lock
+
+import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
+import kafka.log.Log
+import kafka.server._
+import kafka.utils._
+import kafka.utils.timer.MockTimer
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{ MemoryRecords, RecordBatch,
RecordsProcessingStats }
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.easymock.EasyMock
+import org.junit.{ After, Before }
+
+import scala.collection._
+import scala.collection.JavaConverters._
+
+abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] {
+
+ val nThreads = 5
+
+ val time = new MockTime
+ val timer = new MockTimer
+ val executor = Executors.newFixedThreadPool(nThreads)
+ val scheduler = new MockScheduler(time)
+ var replicaManager: TestReplicaManager = _
+ var zkClient: KafkaZkClient = _
+ val serverProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+ val random = new Random
+
+ @Before
+ def setUp() {
+
+replicaManager =
EasyMock.partialMockBuilder(classOf[TestReplicaManager]).createMock()
+replicaManager.createDelayedProducePurgatory(timer)
+
+zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+ }
+
+ @After
+ def tearDown() {
+EasyMock.reset(replicaManager)
+if (executor != null)
+ executor.shutdownNow()
+ }
+
+ /**
+* Verify that concurrent operations run in the normal sequence produce the
expected results.
+*/
+ def verifyConcurrentOperations(createMembers: String => Set[M], operations:
Seq[Operation]) {
+OrderedOperationSequence(createMembers("verifyConcurrentOperations"),
operations).run()
+ }
+
+ /**
+* Verify that arbitrary operations run in some random sequence don't leave
the coordinator
+* in a bad state. Operations in the normal sequence should continue to
work as expected.
+*/
+ def verifyConcurrentRandomSequences(createMembers: String => Set[M],
operations: Seq[Operation]) {
+EasyMock.reset(replicaManager)
+for (i <- 0 to 10) {
+ // Run some random operations
+ RandomOperationSequence(createMembers(s"random$i"), operations).run()
+
+ // Check that proper sequences still work correctly
+ OrderedOperationSequence(createMembers(s"ordered$i"), operations).run()
+}
+ }
+
+ def verifyConcurrentActions(actions: Set[Action]) {
+val futures = actions.map(executor.submit)
+futures.map(_.get)
+enableCompletion()
+actions.foreach(_.await())
+ }
+
+ def enableCompletion(): Unit = {
+replicaManager.tryCompleteDelayedRequests()
+scheduler.tick()
+ }
+
+ abstract class OperationSequence(members: Set[M], operations:
Seq[Operation]) {
+def actionSequence: Seq[Set[Action]]
+def run(): Unit = {
+ actionSequence.foreach(verifyConcurrentActions)
+}
+ }
+