Repository: kafka Updated Branches: refs/heads/trunk 517870271 -> f88fdbd31
KAFKA-6072; User ZookeeperClient in GroupCoordinator and TransactionCoordinator Author: Manikumar Reddy <manikumar.re...@gmail.com> Reviewers: Ted Yu <yuzhih...@gmail.com>, Jun Rao <jun...@gmail.com> Closes #4126 from omkreddy/KAFKA-6072-ZK-IN-GRoupCoordinator Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f88fdbd3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f88fdbd3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f88fdbd3 Branch: refs/heads/trunk Commit: f88fdbd3115cdb0f1bd26817513f3d33359512b1 Parents: 5178702 Author: Manikumar Reddy <manikumar.re...@gmail.com> Authored: Tue Oct 31 18:06:51 2017 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Oct 31 18:06:51 2017 -0700 ---------------------------------------------------------------------- .../coordinator/group/GroupCoordinator.scala | 11 +-- .../group/GroupMetadataManager.scala | 5 +- .../transaction/ProducerIdManager.scala | 11 +-- .../transaction/TransactionCoordinator.scala | 9 ++- .../transaction/TransactionStateManager.scala | 5 +- .../main/scala/kafka/server/KafkaServer.scala | 4 +- .../src/main/scala/kafka/zk/KafkaZkClient.scala | 82 +++++++++++++++++++- .../group/GroupCoordinatorTest.scala | 11 +-- .../group/GroupMetadataManagerTest.scala | 14 ++-- .../transaction/ProducerIdManagerTest.scala | 24 +++--- .../TransactionStateManagerTest.scala | 13 ++-- .../scala/unit/kafka/zk/KafkaZkClientTest.scala | 74 +++++++++++++++++- 12 files changed, 210 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index ed13a08..129eae4 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -23,7 +23,8 @@ import kafka.common.OffsetAndMetadata import kafka.log.LogConfig import kafka.message.ProducerCompressionCodec import kafka.server._ -import kafka.utils._ +import kafka.utils.Logging +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors @@ -819,12 +820,12 @@ object GroupCoordinator { val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers) def apply(config: KafkaConfig, - zkUtils: ZkUtils, + zkClient: KafkaZkClient, replicaManager: ReplicaManager, time: Time): GroupCoordinator = { val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId) val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId) - apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time) + apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time) } private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig( @@ -841,7 +842,7 @@ object GroupCoordinator { ) def apply(config: KafkaConfig, - zkUtils: ZkUtils, + zkClient: KafkaZkClient, replicaManager: ReplicaManager, heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat], joinPurgatory: DelayedOperationPurgatory[DelayedJoin], @@ -852,7 +853,7 @@ object GroupCoordinator { groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay) val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion, - offsetConfig, replicaManager, zkUtils, time) + offsetConfig, replicaManager, zkClient, time) new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time) } http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 67a048d..7e6a643 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock import kafka.utils._ +import kafka.zk.KafkaZkClient import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic @@ -50,7 +51,7 @@ class GroupMetadataManager(brokerId: Int, interBrokerProtocolVersion: ApiVersion, config: OffsetConfig, replicaManager: ReplicaManager, - zkUtils: ZkUtils, + zkClient: KafkaZkClient, time: Time) extends Logging with KafkaMetricsGroup { private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec) @@ -842,7 +843,7 @@ class GroupMetadataManager(brokerId: Int, * If the topic does not exist, the configured partition count is returned. */ private def getGroupMetadataTopicPartitionCount: Int = { - zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions) + zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index f7bde96..6be3c6b 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -18,6 +18,7 @@ package kafka.coordinator.transaction import kafka.common.KafkaException import kafka.utils.{Json, Logging, ZkUtils} +import kafka.zk.KafkaZkClient /** * ProducerIdManager is the part of the transaction coordinator that provides ProducerIds in a unique way @@ -65,7 +66,7 @@ case class ProducerIdBlock(brokerId: Int, blockStartId: Long, blockEndId: Long) } } -class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging { +class ProducerIdManager(val brokerId: Int, val zkClient: KafkaZkClient) extends Logging { this.logIdent = "[ProducerId Manager " + brokerId + "]: " @@ -82,7 +83,7 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging var zkWriteComplete = false while (!zkWriteComplete) { // refresh current producerId block from zookeeper again - val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath) + val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath) // generate the new producerId block currentProducerIdBlock = dataOpt match { @@ -105,7 +106,7 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging val newProducerIdBlockData = ProducerIdManager.generateProducerIdBlockJson(currentProducerIdBlock) // try to write the new producerId block into zookeeper - val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.ProducerIdBlockPath, + val (succeeded, version) = zkClient.conditionalUpdatePath(ZkUtils.ProducerIdBlockPath, newProducerIdBlockData, zkVersion, Some(checkProducerIdBlockZkData)) zkWriteComplete = succeeded @@ -114,10 +115,10 @@ class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging } } - private def checkProducerIdBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = { + private def checkProducerIdBlockZkData(zkClient: KafkaZkClient, path: String, expectedData: String): (Boolean, Int) = { try { val expectedPidBlock = ProducerIdManager.parseProducerIdBlockData(expectedData) - val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.ProducerIdBlockPath) + val (dataOpt, zkVersion) = zkClient.getDataAndVersion(ZkUtils.ProducerIdBlockPath) dataOpt match { case Some(data) => val currProducerIdBLock = ProducerIdManager.parseProducerIdBlockData(data) http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 6ad1f40..2cc719d 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -20,7 +20,8 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import kafka.server.{DelayedOperationPurgatory, KafkaConfig, MetadataCache, ReplicaManager} -import kafka.utils.{Logging, Scheduler, ZkUtils} +import kafka.utils.{Logging, Scheduler} +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics @@ -34,7 +35,7 @@ object TransactionCoordinator { def apply(config: KafkaConfig, replicaManager: ReplicaManager, scheduler: Scheduler, - zkUtils: ZkUtils, + zkClient: KafkaZkClient, metrics: Metrics, metadataCache: MetadataCache, time: Time): TransactionCoordinator = { @@ -50,11 +51,11 @@ object TransactionCoordinator { config.transactionRemoveExpiredTransactionalIdCleanupIntervalMs, config.requestTimeoutMs) - val producerIdManager = new ProducerIdManager(config.brokerId, zkUtils) + val producerIdManager = new ProducerIdManager(config.brokerId, zkClient) // we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId, reaperEnabled = false, timerEnabled = false) - val txnStateManager = new TransactionStateManager(config.brokerId, zkUtils, scheduler, replicaManager, txnConfig, time) + val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time) val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ") val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager, http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index f2e25c4..b962e82 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -29,6 +29,7 @@ import kafka.server.Defaults import kafka.server.ReplicaManager import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.{Logging, Pool, Scheduler, ZkUtils} +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors @@ -68,7 +69,7 @@ object TransactionStateManager { * </ul> */ class TransactionStateManager(brokerId: Int, - zkUtils: ZkUtils, + zkClient: KafkaZkClient, scheduler: Scheduler, replicaManager: ReplicaManager, config: TransactionConfig, @@ -282,7 +283,7 @@ class TransactionStateManager(brokerId: Int, * If the topic does not exist, the default partition count is returned. */ private def getTransactionTopicPartitionCount: Int = { - zkUtils.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions) + zkClient.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions) } private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): Pool[String, TransactionMetadata] = { http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e870ce4..f8111ff 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -259,12 +259,12 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP /* start group coordinator */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue - groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM) + groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM) groupCoordinator.startup() /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue - transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkUtils, metrics, metadataCache, Time.SYSTEM) + transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM) transactionCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/main/scala/kafka/zk/KafkaZkClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index 925a6f6..026dc9d 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -16,6 +16,7 @@ */ package kafka.zk +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Properties import kafka.api.LeaderAndIsr @@ -26,13 +27,13 @@ import kafka.log.LogConfig import kafka.server.ConfigType import kafka.utils._ import kafka.zookeeper._ +import org.apache.kafka.common.TopicPartition import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.Stat import org.apache.zookeeper.{CreateMode, KeeperException} import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.kafka.common.TopicPartition /** * Provides higher level Kafka-specific operations on top of the pipelined [[kafka.zookeeper.ZooKeeperClient]]. @@ -311,6 +312,85 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends } /** + * Gets the partition count for a given topic + * @param topic The topic to get partition count for. + * @return optional integer that is Some if the topic exists and None otherwise. + */ + def getTopicPartitionCount(topic: String): Option[Int] = { + val topicData = getReplicaAssignmentForTopics(Set(topic)) + if (topicData.nonEmpty) + Some(topicData.size) + else + None + } + + /** + * Gets the data and version at the given zk path + * @param path zk node path + * @return A tuple of 2 elements, where first element is zk node data as string + * and second element is zk node version. + * returns (None, -1) if node doesn't exists and throws exception for any error + */ + def getDataAndVersion(path: String): (Option[String], Int) = { + val getDataRequest = GetDataRequest(path) + val getDataResponse = retryRequestUntilConnected(getDataRequest) + + if (getDataResponse.resultCode == Code.OK) { + if (getDataResponse.data == null) + (None, getDataResponse.stat.getVersion) + else { + val data = new String(getDataResponse.data, UTF_8) + (Some(data), getDataResponse.stat.getVersion) + } + } else if (getDataResponse.resultCode == Code.NONODE) + (None, -1) + else + throw getDataResponse.resultException.get + } + + /** + * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't + * exist, the current version is not the expected version, etc.) return (false, -1) + * + * When there is a ConnectionLossException during the conditional update, ZookeeperClient will retry the update and may fail + * since the previous update may have succeeded (but the stored zkVersion no longer matches the expected one). + * In this case, we will run the optionalChecker to further check if the previous write did indeed succeeded. + */ + def conditionalUpdatePath(path: String, data: String, expectVersion: Int, + optionalChecker:Option[(KafkaZkClient, String, String) => (Boolean,Int)] = None): (Boolean, Int) = { + + val setDataRequest = SetDataRequest(path, data.getBytes(UTF_8), expectVersion) + val setDataResponse = retryRequestUntilConnected(setDataRequest) + + setDataResponse.resultCode match { + case Code.OK => + debug("Conditional update of path %s with value %s and expected version %d succeeded, returning the new version: %d" + .format(path, data, expectVersion, setDataResponse.stat.getVersion)) + (true, setDataResponse.stat.getVersion) + + case Code.BADVERSION => + optionalChecker match { + case Some(checker) => checker(this, path, data) + case _ => + debug("Checker method is not passed skipping zkData match") + debug("Conditional update of path %s with data %s and expected version %d failed due to %s" + .format(path, data, expectVersion, setDataResponse.resultException.get.getMessage)) + (false, -1) + } + + case Code.NONODE => + debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, setDataResponse.resultException.get.getMessage)) + (false, -1) + + case _ => + debug("Conditional update of path %s with data %s and expected version %d failed due to %s".format(path, data, + expectVersion, setDataResponse.resultException.get.getMessage)) + throw setDataResponse.resultException.get + } + } + + /** * Get all topics marked for deletion. * @return sequence of topics marked for deletion. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 22efb33..1c770a4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -30,6 +30,7 @@ import org.easymock.{Capture, EasyMock, IAnswer} import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock +import kafka.zk.KafkaZkClient import org.apache.kafka.common.internals.Topic import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} @@ -61,7 +62,7 @@ class GroupCoordinatorTest extends JUnitSuite { var groupCoordinator: GroupCoordinator = null var replicaManager: ReplicaManager = null var scheduler: KafkaScheduler = null - var zkUtils: ZkUtils = null + var zkClient: KafkaZkClient = null private val groupId = "groupId" private val protocolType = "consumer" @@ -85,10 +86,10 @@ class GroupCoordinatorTest extends JUnitSuite { replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) - zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator - EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) - EasyMock.replay(zkUtils) + EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) + EasyMock.replay(zkClient) timer = new MockTimer @@ -97,7 +98,7 @@ class GroupCoordinatorTest extends JUnitSuite { val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false) val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false) - groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time) + groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time) groupCoordinator.startup(false) // add the partition into the owned partition list http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index b437405..abedcb8 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -23,7 +23,7 @@ import kafka.common.OffsetAndMetadata import kafka.log.{Log, LogAppendInfo} import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager} import kafka.utils.TestUtils.fail -import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} +import kafka.utils.{KafkaScheduler, MockTime, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ @@ -42,13 +42,15 @@ import scala.collection.JavaConverters._ import scala.collection._ import java.util.concurrent.locks.ReentrantLock +import kafka.zk.KafkaZkClient + class GroupMetadataManagerTest { var time: MockTime = null var replicaManager: ReplicaManager = null var groupMetadataManager: GroupMetadataManager = null var scheduler: KafkaScheduler = null - var zkUtils: ZkUtils = null + var zkClient: KafkaZkClient = null var partition: Partition = null val groupId = "foo" @@ -74,13 +76,13 @@ class GroupMetadataManagerTest { offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator - zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) - EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) - EasyMock.replay(zkUtils) + zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) + EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) + EasyMock.replay(zkClient) time = new MockTime replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) - groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkUtils, time) + groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time) partition = EasyMock.niceMock(classOf[Partition]) } http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala index 39353b8..c5b42d4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala @@ -17,34 +17,34 @@ package kafka.coordinator.transaction import kafka.common.KafkaException -import kafka.utils.ZkUtils +import kafka.zk.KafkaZkClient import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.{After, Test} import org.junit.Assert._ class ProducerIdManagerTest { - private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) @After def tearDown(): Unit = { - EasyMock.reset(zkUtils) + EasyMock.reset(zkClient) } @Test def testGetProducerId() { var zkVersion: Option[Int] = None var data: String = null - EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] { + EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] { override def answer(): (Option[String], Int) = zkVersion.map(Some(data) -> _).getOrElse(None, 0) }).anyTimes() val capturedVersion: Capture[Int] = EasyMock.newCapture() val capturedData: Capture[String] = EasyMock.newCapture() - EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(), + EasyMock.expect(zkClient.conditionalUpdatePath(EasyMock.anyString(), EasyMock.capture(capturedData), EasyMock.capture(capturedVersion), - EasyMock.anyObject[Option[(ZkUtils, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] { + EasyMock.anyObject[Option[(KafkaZkClient, String, String) => (Boolean, Int)]])).andAnswer(new IAnswer[(Boolean, Int)] { override def answer(): (Boolean, Int) = { val newZkVersion = capturedVersion.getValue + 1 zkVersion = Some(newZkVersion) @@ -53,10 +53,10 @@ class ProducerIdManagerTest { } }).anyTimes() - EasyMock.replay(zkUtils) + EasyMock.replay(zkClient) - val manager1 = new ProducerIdManager(0, zkUtils) - val manager2 = new ProducerIdManager(1, zkUtils) + val manager1 = new ProducerIdManager(0, zkClient) + val manager2 = new ProducerIdManager(1, zkClient) val pid1 = manager1.generateProducerId() val pid2 = manager2.generateProducerId() @@ -76,15 +76,15 @@ class ProducerIdManagerTest { @Test(expected = classOf[KafkaException]) def testExceedProducerIdLimit() { - EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] { + EasyMock.expect(zkClient.getDataAndVersion(EasyMock.anyString)).andAnswer(new IAnswer[(Option[String], Int)] { override def answer(): (Option[String], Int) = { val json = ProducerIdManager.generateProducerIdBlockJson( ProducerIdBlock(0, Long.MaxValue - ProducerIdManager.PidBlockSize, Long.MaxValue)) (Some(json), 0) } }).anyTimes() - EasyMock.replay(zkUtils) - new ProducerIdManager(0, zkUtils) + EasyMock.replay(zkClient) + new ProducerIdManager(0, zkClient) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 7973b9a..20dfaa6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -21,8 +21,9 @@ import java.util.concurrent.locks.ReentrantLock import kafka.log.Log import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager} -import kafka.utils.{MockScheduler, Pool, ZkUtils} +import kafka.utils.{MockScheduler, Pool} import kafka.utils.TestUtils.fail +import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME import org.apache.kafka.common.protocol.Errors @@ -51,17 +52,17 @@ class TransactionStateManagerTest { val time = new MockTime() val scheduler = new MockScheduler(time) - val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient]) val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) - EasyMock.expect(zkUtils.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME)) + EasyMock.expect(zkClient.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME)) .andReturn(Some(numPartitions)) .anyTimes() - EasyMock.replay(zkUtils) + EasyMock.replay(zkClient) val txnConfig = TransactionConfig() - val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkUtils, scheduler, replicaManager, txnConfig, time) + val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time) val transactionalId1: String = "one" val transactionalId2: String = "two" @@ -82,7 +83,7 @@ class TransactionStateManagerTest { @After def tearDown() { - EasyMock.reset(zkUtils, replicaManager) + EasyMock.reset(zkClient, replicaManager) transactionManager.shutdown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/f88fdbd3/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala index 00c0a02..f2d95c2 100644 --- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala @@ -16,11 +16,14 @@ */ package kafka.zk +import kafka.common.TopicAndPartition +import kafka.utils.ZkUtils import kafka.zookeeper.ZooKeeperClient - -import org.junit.{After, Before, Test} -import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.apache.kafka.common.TopicPartition +import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.{After, Before, Test} + +import scala.collection.mutable class KafkaZkClientTest extends ZooKeeperTestHarness { @@ -89,4 +92,69 @@ class KafkaZkClientTest extends ZooKeeperTestHarness { intercept[IllegalArgumentException](zkClient.createRecursive("create-invalid-path")) } + @Test + def testGetTopicPartitionCount() { + val topic = "mytest" + + // test with non-existing topic + assertTrue(zkClient.getTopicPartitionCount(topic).isEmpty) + + // create a topic path + zkClient.createRecursive(ZkUtils.getTopicPath(topic)) + + val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]() + assignment.put(new TopicAndPartition(topic, 0), Seq(0,1)) + assignment.put(new TopicAndPartition(topic, 1), Seq(0,1)) + zkClient.setTopicAssignmentRaw(topic, assignment.toMap) + + assertEquals(2, zkClient.getTopicPartitionCount(topic).get) + } + + + @Test + def testGetDataAndVersion() { + val path = "/testpath" + + // test with non-existing path + var dataAndVersion = zkClient.getDataAndVersion(path) + assertTrue(dataAndVersion._1.isEmpty) + assertEquals(-1, dataAndVersion._2) + + // create a test path + zkClient.createRecursive(path) + zkClient.conditionalUpdatePath(path, "version1", 0) + + // test with existing path + dataAndVersion = zkClient.getDataAndVersion(path) + assertEquals("version1", dataAndVersion._1.get) + assertEquals(1, dataAndVersion._2) + + zkClient.conditionalUpdatePath(path, "version2", 1) + dataAndVersion = zkClient.getDataAndVersion(path) + assertEquals("version2", dataAndVersion._1.get) + assertEquals(2, dataAndVersion._2) + } + + @Test + def testConditionalUpdatePath() { + val path = "/testconditionalpath" + + // test with non-existing path + var statusAndVersion = zkClient.conditionalUpdatePath(path, "version0", 0) + assertFalse(statusAndVersion._1) + assertEquals(-1, statusAndVersion._2) + + // create path + zkClient.createRecursive(path) + + // test with valid expected version + statusAndVersion = zkClient.conditionalUpdatePath(path, "version1", 0) + assertTrue(statusAndVersion._1) + assertEquals(1, statusAndVersion._2) + + // test with invalid expected version + statusAndVersion = zkClient.conditionalUpdatePath(path, "version2", 2) + assertFalse(statusAndVersion._1) + assertEquals(-1, statusAndVersion._2) + } } \ No newline at end of file