[jira] [Commented] (KAFKA-6096) Add concurrent tests to exercise all paths in group/transaction managers

2018-01-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317394#comment-16317394
 ] 

ASF GitHub Bot commented on KAFKA-6096:
---

hachikuji closed pull request #4122: KAFKA-6096: Add multi-threaded tests for 
group coordinator, txn manager
URL: https://github.com/apache/kafka/pull/4122
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
new file mode 100644
index 000..0ecc3f538b1
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util.{ Collections, Random }
+import java.util.concurrent.{ ConcurrentHashMap, Executors }
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.locks.Lock
+
+import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
+import kafka.log.Log
+import kafka.server._
+import kafka.utils._
+import kafka.utils.timer.MockTimer
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{ MemoryRecords, RecordBatch, 
RecordsProcessingStats }
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.easymock.EasyMock
+import org.junit.{ After, Before }
+
+import scala.collection._
+import scala.collection.JavaConverters._
+
+abstract class AbstractCoordinatorConcurrencyTest[M <: CoordinatorMember] {
+
+  val nThreads = 5
+
+  val time = new MockTime
+  val timer = new MockTimer
+  val executor = Executors.newFixedThreadPool(nThreads)
+  val scheduler = new MockScheduler(time)
+  var replicaManager: TestReplicaManager = _
+  var zkClient: KafkaZkClient = _
+  val serverProps = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+  val random = new Random
+
+  @Before
+  def setUp() {
+
+replicaManager = 
EasyMock.partialMockBuilder(classOf[TestReplicaManager]).createMock()
+replicaManager.createDelayedProducePurgatory(timer)
+
+zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+  }
+
+  @After
+  def tearDown() {
+EasyMock.reset(replicaManager)
+if (executor != null)
+  executor.shutdownNow()
+  }
+
+  /**
+* Verify that concurrent operations run in the normal sequence produce the 
expected results.
+*/
+  def verifyConcurrentOperations(createMembers: String => Set[M], operations: 
Seq[Operation]) {
+OrderedOperationSequence(createMembers("verifyConcurrentOperations"), 
operations).run()
+  }
+
+  /**
+* Verify that arbitrary operations run in some random sequence don't leave 
the coordinator
+* in a bad state. Operations in the normal sequence should continue to 
work as expected.
+*/
+  def verifyConcurrentRandomSequences(createMembers: String => Set[M], 
operations: Seq[Operation]) {
+EasyMock.reset(replicaManager)
+for (i <- 0 to 10) {
+  // Run some random operations
+  RandomOperationSequence(createMembers(s"random$i"), operations).run()
+
+  // Check that proper sequences still work correctly
+  OrderedOperationSequence(createMembers(s"ordered$i"), operations).run()
+}
+  }
+
+  def verifyConcurrentActions(actions: Set[Action]) {
+val futures = actions.map(executor.submit)
+futures.map(_.get)
+enableCompletion()
+actions.foreach(_.await())
+  }
+
+  def enableCompletion(): Unit = {
+replicaManager.tryCompleteDelayedRequests()
+scheduler.tick()
+  }
+
+  abstract class OperationSequence(members: Set[M], operations: 
Seq[Operation]) {
+def actionSequence: Seq[Set[Action]]
+def run(): Unit = {
+  actionSequence.foreach(verifyConcurrentActions)
+}
+  }
+

[jira] [Commented] (KAFKA-6096) Add concurrent tests to exercise all paths in group/transaction managers

2017-10-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16215941#comment-16215941
 ] 

ASF GitHub Bot commented on KAFKA-6096:
---

GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/4122

KAFKA-6096: Add multi-threaded tests for group coordinator, txn manager



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka KAFKA-6096-deadlock-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4122.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4122


commit 5e39e73da84c0057ae4815b066cbc6e9113bc608
Author: Rajini Sivaram 
Date:   2017-10-23T20:59:04Z

KAFKA-6096: Add multi-threaded tests for group coordinator, txn manager




> Add concurrent tests to exercise all paths in group/transaction managers
> 
>
> Key: KAFKA-6096
> URL: https://issues.apache.org/jira/browse/KAFKA-6096
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Reporter: Rajini Sivaram
>Assignee: Jason Gustafson
> Fix For: 1.1.0
>
>
> We don't have enough tests to test locking/deadlocks in GroupMetadataManager 
> and TransactionManager. Since we have had a lot of deadlocks (KAFKA-5970, 
> KAFKA-6042 etc.) which were not detected during testing, we should add more 
> mock tests with concurrency to verify the locking.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)