Repository: kafka Updated Branches: refs/heads/trunk f7f8e1121 -> ab6f848ba
http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 028201f..ca8a0d6 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -68,7 +68,7 @@ class TopicFilterTest extends JUnitSuite { topicCount.getTopicCountMap.head._1 } //lets make sure that the JSON strings are escaping as we expect - //if they are not then when they get saved to zookeeper and read back out they will be broken on parse + //if they are not then when they get saved to ZooKeeper and read back out they will be broken on parse assertEquals("-\\\"-", getTopicCountMapKey("-\"-")) assertEquals("-\\\\-", getTopicCountMapKey("-\\-")) assertEquals("-\\/-", getTopicCountMapKey("-/-")) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala index 296f4a7..81df1e1 100644 --- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala @@ -18,10 +18,12 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition -import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.log.LogConfig import kafka.server.KafkaConfig import kafka.utils.TestUtils +import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper.{CreateResponse, GetDataResponse, ZooKeeperClientException} import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.Stat import org.easymock.EasyMock @@ -33,7 +35,7 @@ import scala.collection.mutable class PartitionStateMachineTest extends JUnitSuite { private var controllerContext: ControllerContext = null - private var mockZkUtils: KafkaControllerZkUtils = null + private var mockZkClient: KafkaZkClient = null private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null private var mockTopicDeletionManager: TopicDeletionManager = null private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = null @@ -49,12 +51,12 @@ class PartitionStateMachineTest extends JUnitSuite { def setUp(): Unit = { controllerContext = new ControllerContext controllerContext.epoch = controllerEpoch - mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils]) + mockZkClient = EasyMock.createMock(classOf[KafkaZkClient]) mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch]) mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager]) partitionState = mutable.Map.empty[TopicAndPartition, PartitionState] partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, - mockZkUtils, partitionState, mockControllerBrokerRequestBatch) + mockZkClient, partitionState, mockControllerBrokerRequestBatch) } @Test @@ -82,14 +84,14 @@ class PartitionStateMachineTest extends JUnitSuite { partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) + EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null))) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OnlinePartition, partitionState(partition)) } @@ -100,12 +102,12 @@ class PartitionStateMachineTest extends JUnitSuite { partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) - .andThrow(new ZookeeperClientException("test")) + EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) + .andThrow(new ZooKeeperClientException("test")) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(NewPartition, partitionState(partition)) } @@ -116,12 +118,12 @@ class PartitionStateMachineTest extends JUnitSuite { partitionState.put(partition, NewPartition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), controllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) + EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch))) .andReturn(Seq(CreateResponse(Code.NODEEXISTS, null, Some(partition), null))) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(NewPartition, partitionState(partition)) } @@ -150,22 +152,22 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) + EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) - EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OnlinePartition, partitionState(partition)) } @@ -182,22 +184,22 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) + EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) val leaderAndIsrAfterElection = leaderAndIsr.newLeaderAndIsr(otherBrokerId, List(otherBrokerId)) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) - EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(ControlledShutdownPartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OnlinePartition, partitionState(partition)) } @@ -233,23 +235,23 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) + EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) - EasyMock.expect(mockZkUtils.getLogConfigs(Seq.empty, config.originals())) + EasyMock.expect(mockZkClient.getLogConfigs(Seq.empty, config.originals())) .andReturn((Map(partition.topic -> LogConfig()), Map.empty)) val leaderAndIsrAfterElection = leaderAndIsr.newLeader(brokerId) val updatedLeaderAndIsr = leaderAndIsrAfterElection.withZkVersion(2) - EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OnlinePartition, partitionState(partition)) } @@ -263,14 +265,14 @@ class PartitionStateMachineTest extends JUnitSuite { controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) - .andThrow(new ZookeeperClientException("")) + EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) + .andThrow(new ZooKeeperClientException("")) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OfflinePartition, partitionState(partition)) } @@ -285,15 +287,15 @@ class PartitionStateMachineTest extends JUnitSuite { val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) EasyMock.expect(mockControllerBrokerRequestBatch.newBatch()) - EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) + EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) .andReturn(Seq(GetDataResponse(Code.NONODE, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OfflinePartition, partitionState(partition)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala index 0afe7c2..6363d41 100644 --- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala @@ -18,9 +18,11 @@ package kafka.controller import kafka.api.LeaderAndIsr import kafka.common.TopicAndPartition -import kafka.controller.KafkaControllerZkUtils.UpdateLeaderAndIsrResult import kafka.server.KafkaConfig import kafka.utils.TestUtils +import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode} +import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult +import kafka.zookeeper.GetDataResponse import org.apache.zookeeper.KeeperException.Code import org.apache.zookeeper.data.Stat import org.easymock.EasyMock @@ -32,7 +34,7 @@ import scala.collection.mutable class ReplicaStateMachineTest extends JUnitSuite { private var controllerContext: ControllerContext = null - private var mockZkUtils: KafkaControllerZkUtils = null + private var mockZkClient: KafkaZkClient = null private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null private var mockTopicDeletionManager: TopicDeletionManager = null private var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = null @@ -50,11 +52,11 @@ class ReplicaStateMachineTest extends JUnitSuite { def setUp(): Unit = { controllerContext = new ControllerContext controllerContext.epoch = controllerEpoch - mockZkUtils = EasyMock.createMock(classOf[KafkaControllerZkUtils]) + mockZkClient = EasyMock.createMock(classOf[KafkaZkClient]) mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch]) mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager]) replicaState = mutable.Map.empty[PartitionAndReplica, ReplicaState] - replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkUtils, + replicaStateMachine = new ReplicaStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager, mockZkClient, replicaState, mockControllerBrokerRequestBatch) } @@ -155,9 +157,9 @@ class ReplicaStateMachineTest extends JUnitSuite { EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, OnlineReplica) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OnlineReplica, replicaState(replica)) } @@ -178,19 +180,19 @@ class ReplicaStateMachineTest extends JUnitSuite { val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId)) val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1) val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch) - EasyMock.expect(mockZkUtils.getTopicPartitionStatesRaw(partitions)) + EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)) .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition), TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat))) - EasyMock.expect(mockZkUtils.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch)) + EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch)) .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty)) EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false) EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId), partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager) replicaStateMachine.handleStateChanges(replicas, OfflineReplica) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch, mockTopicDeletionManager) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager) assertEquals(updatedLeaderIsrAndControllerEpoch, controllerContext.partitionLeadershipInfo(partition)) assertEquals(OfflineReplica, replicaState(replica)) } @@ -230,9 +232,9 @@ class ReplicaStateMachineTest extends JUnitSuite { EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, OnlineReplica) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OnlineReplica, replicaState(replica)) } @@ -244,9 +246,9 @@ class ReplicaStateMachineTest extends JUnitSuite { EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, true, callbacks.stopReplicaResponseCallback)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(ReplicaDeletionStarted, replicaState(replica)) } @@ -348,9 +350,9 @@ class ReplicaStateMachineTest extends JUnitSuite { EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId), partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false)) EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch)) - EasyMock.replay(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch) replicaStateMachine.handleStateChanges(replicas, OnlineReplica) - EasyMock.verify(mockZkUtils, mockControllerBrokerRequestBatch) + EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch) assertEquals(OnlineReplica, replicaState(replica)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala b/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala deleted file mode 100644 index d7b46c7..0000000 --- a/core/src/test/scala/unit/kafka/controller/ZookeeperClientTest.scala +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.controller - -import java.net.UnknownHostException -import java.nio.charset.StandardCharsets -import java.util.UUID -import java.util.concurrent.{CountDownLatch, TimeUnit} -import javax.security.auth.login.Configuration - -import kafka.zk.ZooKeeperTestHarness -import org.apache.kafka.common.security.JaasUtils -import org.apache.zookeeper.KeeperException.Code -import org.apache.zookeeper.{CreateMode, ZooDefs} -import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue} -import org.junit.{After, Test} - -class ZookeeperClientTest extends ZooKeeperTestHarness { - private val mockPath = "/foo" - - @After - override def tearDown() { - super.tearDown() - System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) - Configuration.setConfiguration(null) - } - - @Test(expected = classOf[UnknownHostException]) - def testUnresolvableConnectString(): Unit = { - new ZookeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null) - } - - @Test(expected = classOf[ZookeeperClientTimeoutException]) - def testConnectionTimeout(): Unit = { - zookeeper.shutdown() - new ZookeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null) - } - - @Test - def testConnection(): Unit = { - new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - } - - @Test - def testDeleteNonExistentZNode(): Unit = { - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1)) - assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode) - } - - @Test - def testDeleteExistingZNode(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1)) - assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode) - } - - @Test - def testExistsNonExistentZNode(): Unit = { - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath)) - assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode) - } - - @Test - def testExistsExistingZNode(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val existsResponse = zookeeperClient.handleRequest(ExistsRequest(mockPath)) - assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode) - } - - @Test - def testGetDataNonExistentZNode(): Unit = { - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath)) - assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode) - } - - @Test - def testGetDataExistingZNode(): Unit = { - import scala.collection.JavaConverters._ - val data = bytes - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, - CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath)) - assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) - assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data) - } - - @Test - def testSetDataNonExistentZNode(): Unit = { - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1)) - assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode) - } - - @Test - def testSetDataExistingZNode(): Unit = { - import scala.collection.JavaConverters._ - val data = bytes - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], - ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, data, -1)) - assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode) - val getDataResponse = zookeeperClient.handleRequest(GetDataRequest(mockPath)) - assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) - assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data) - } - - @Test - def testGetAclNonExistentZNode(): Unit = { - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath)) - assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode) - } - - @Test - def testGetAclExistingZNode(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val getAclResponse = zookeeperClient.handleRequest(GetAclRequest(mockPath)) - assertEquals("Response code for getAcl should be OK", Code.OK, getAclResponse.resultCode) - assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl) - } - - @Test - def testSetAclNonExistentZNode(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val setAclResponse = zookeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1)) - assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode) - } - - @Test - def testGetChildrenNonExistentZNode(): Unit = { - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) - assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode) - } - - @Test - def testGetChildrenExistingZNode(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], - ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) - assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) - assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children) - } - - @Test - def testGetChildrenExistingZNodeWithChildren(): Unit = { - import scala.collection.JavaConverters._ - val child1 = "child1" - val child2 = "child2" - val child1Path = mockPath + "/" + child1 - val child2Path = mockPath + "/" + child2 - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], - ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], - ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode) - val createResponseChild2 = zookeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte], - ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create child2 should be OK", Code.OK, createResponseChild2.resultCode) - - val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) - assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) - assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted) - } - - @Test - def testPipelinedGetData(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - val createResponses = createRequests.map(zookeeperClient.handleRequest) - createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)) - val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x)) - val getDataResponses = zookeeperClient.handleRequests(getDataRequests) - getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK, - getDataResponse.resultCode)) - getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) => - assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) - assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.data))) - } - } - - @Test - def testMixedPipeline(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], - ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - val getDataRequest = GetDataRequest(mockPath) - val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1) - val responses = zookeeperClient.handleRequests(Seq(getDataRequest, setDataRequest)) - assertEquals("Response code for getData should be OK", Code.OK, responses.head.resultCode) - assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data) - assertEquals("Response code for setData should be NONODE", Code.NONODE, responses.last.resultCode) - } - - @Test - def testZNodeChangeHandlerForCreation(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) - val zNodeChangeHandler = new ZNodeChangeHandler { - override def handleCreation(): Unit = { - znodeChangeHandlerCountDownLatch.countDown() - } - override val path: String = mockPath - } - - zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - val existsRequest = ExistsRequest(mockPath) - val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) - val responses = zookeeperClient.handleRequests(Seq(existsRequest, createRequest)) - assertEquals("Response code for exists should be NONODE", Code.NONODE, responses.head.resultCode) - assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) - assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) - } - - @Test - def testZNodeChangeHandlerForDeletion(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) - val zNodeChangeHandler = new ZNodeChangeHandler { - override def handleDeletion(): Unit = { - znodeChangeHandlerCountDownLatch.countDown() - } - override val path: String = mockPath - } - - zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - val existsRequest = ExistsRequest(mockPath) - val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) - val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest)) - assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) - assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode) - val deleteResponse = zookeeperClient.handleRequest(DeleteRequest(mockPath, -1)) - assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode) - assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) - } - - @Test - def testZNodeChangeHandlerForDataChange(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) - val zNodeChangeHandler = new ZNodeChangeHandler { - override def handleDataChange(): Unit = { - znodeChangeHandlerCountDownLatch.countDown() - } - override val path: String = mockPath - } - - zookeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) - val existsRequest = ExistsRequest(mockPath) - val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) - val responses = zookeeperClient.handleRequests(Seq(createRequest, existsRequest)) - assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) - assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode) - val setDataResponse = zookeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1)) - assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode) - assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) - } - - @Test - def testZNodeChildChangeHandlerForChildChange(): Unit = { - import scala.collection.JavaConverters._ - val zookeeperClient = new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) - val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1) - val zNodeChildChangeHandler = new ZNodeChildChangeHandler { - override def handleChildChange(): Unit = { - zNodeChildChangeHandlerCountDownLatch.countDown() - } - override val path: String = mockPath - } - - val child1 = "child1" - val child1Path = mockPath + "/" + child1 - val createResponse = zookeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) - zookeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler) - val getChildrenResponse = zookeeperClient.handleRequest(GetChildrenRequest(mockPath)) - assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) - val createResponseChild1 = zookeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) - assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode) - assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) - } - - @Test - def testStateChangeHandlerForAuthFailure(): Unit = { - System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf") - val stateChangeHandlerCountDownLatch = new CountDownLatch(1) - val stateChangeHandler = new StateChangeHandler { - override def onAuthFailure(): Unit = { - stateChangeHandlerCountDownLatch.countDown() - } - } - new ZookeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler) - assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) - } - - private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8) -} http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index fb76ca1..cfc325d 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -86,7 +86,7 @@ class AutoOffsetResetTest extends KafkaServerTestHarness with Logging { for(_ <- 0 until numMessages) producer.send(new KeyedMessage[String, Array[Byte]](topic, topic, "test".getBytes)) - // update offset in zookeeper for consumer to jump "forward" in time + // update offset in ZooKeeper for consumer to jump "forward" in time val dirs = new ZKGroupTopicDirs(group, topic) val consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer) consumerProps.put("auto.offset.reset", resetTo) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index ca17b9a..22a06e7 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -239,7 +239,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer, resource) assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource))) - //test removing last acl also deletes zookeeper path + //test removing last acl also deletes ZooKeeper path acls = changeAclAndVerify(Set.empty[Acl], Set(acl1), Set.empty[Acl]) changeAclAndVerify(acls, Set.empty[Acl], acls) assertTrue(!zkUtils.pathExists(simpleAclAuthorizer.toResourcePath(resource))) @@ -405,7 +405,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { def testHighConcurrencyDeletionOfResourceAcls() { val acl = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username), Allow, WildCardHost, All) - // Alternate authorizer to keep adding and removing zookeeper path + // Alternate authorizer to keep adding and removing ZooKeeper path val concurrentFuctions = (0 to 50).map { _ => () => { simpleAclAuthorizer.addAcls(Set(acl), resource) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 4196bc1..31d32d2 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -84,7 +84,7 @@ class ClientQuotaManagerTest { /** * Tests parsing for <client-id> quotas. - * Quota overrides persisted in Zookeeper in /config/clients/<client-id>, default persisted in /config/clients/<default> + * Quota overrides persisted in ZooKeeper in /config/clients/<client-id>, default persisted in /config/clients/<default> */ @Test def testClientIdQuotaParsing() { @@ -97,7 +97,7 @@ class ClientQuotaManagerTest { /** * Tests parsing for <user> quotas. - * Quota overrides persisted in Zookeeper in /config/users/<user>, default persisted in /config/users/<default> + * Quota overrides persisted in ZooKeeper in /config/users/<user>, default persisted in /config/users/<default> */ @Test def testUserQuotaParsing() { @@ -111,7 +111,7 @@ class ClientQuotaManagerTest { /** * Tests parsing for <user, client-id> quotas. - * Quotas persisted in Zookeeper in /config/users/<user>/clients/<client-id>, default in /config/users/<default>/clients/<default> + * Quotas persisted in ZooKeeper in /config/users/<user>/clients/<client-id>, default in /config/users/<default>/clients/<default> */ @Test def testUserClientIdQuotaParsing() { http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a86c160..60f403d 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package unit.kafka.server +package kafka.server import java.lang.{Long => JLong} import java.net.InetAddress @@ -30,7 +30,6 @@ import kafka.log.{Log, TimestampOffset} import kafka.network.RequestChannel import kafka.security.auth.Authorizer import kafka.server.QuotaFactory.QuotaManagers -import kafka.server._ import kafka.utils.{MockTime, TestUtils, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.UnsupportedVersionException http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala index 8786b19..306dbc0 100755 --- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala @@ -80,7 +80,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val topic = topicPartition.split("-").head val part = Integer.valueOf(topicPartition.split("-").last).intValue - // setup brokers in zookeeper as owners of partitions for this test + // setup brokers in ZooKeeper as owners of partitions for this test AdminUtils.createTopic(zkUtils, topic, 1, 1) val logManager = server.getLogManager @@ -115,7 +115,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val topic = topicPartition.split("-").head val part = Integer.valueOf(topicPartition.split("-").last).intValue - // setup brokers in zookeeper as owners of partitions for this test + // setup brokers in ZooKeeper as owners of partitions for this test AdminUtils.createTopic(zkUtils, topic, 1, 1) val logManager = server.getLogManager @@ -154,7 +154,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val topic = topicPartition.split("-").head - // setup brokers in zookeeper as owners of partitions for this test + // setup brokers in ZooKeeper as owners of partitions for this test createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) var offsetChanged = false @@ -178,7 +178,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val topic = topicPartition.split("-").head val part = Integer.valueOf(topicPartition.split("-").last).intValue - // setup brokers in zookeeper as owners of partitions for this test + // setup brokers in ZooKeeper as owners of partitions for this test AdminUtils.createTopic(zkUtils, topic, 3, 1) val logManager = server.getLogManager @@ -207,7 +207,7 @@ class LogOffsetTest extends ZooKeeperTestHarness { val topic = topicPartition.split("-").head val part = Integer.valueOf(topicPartition.split("-").last).intValue - // setup brokers in zookeeper as owners of partitions for this test + // setup brokers in ZooKeeper as owners of partitions for this test AdminUtils.createTopic(zkUtils, topic, 3, 1) val logManager = server.getLogManager http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index 9c51a10..fa373f5 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -139,7 +139,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { updateProducer() leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId) - assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0", + assertTrue("Leader must remain on broker 1, in case of ZooKeeper session expiration it can move to broker 0", leader == 0 || leader == 1) assertEquals(hw, hwFile1.read.getOrElse(topicPartition, 0L)) @@ -150,7 +150,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { server2.startup() updateProducer() leader = waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, oldLeaderOpt = Some(leader)) - assertTrue("Leader must remain on broker 0, in case of zookeeper session expiration it can move to broker 1", + assertTrue("Leader must remain on broker 0, in case of ZooKeeper session expiration it can move to broker 1", leader == 0 || leader == 1) sendMessages(1) http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 2920730..b8d0afb 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -74,7 +74,7 @@ object TestUtils extends Logging { /** Port to use for unit tests that mock/don't require a real ZK server. */ val MockZkPort = 1 - /** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */ + /** ZooKeeper connection string to use for unit tests that mock/don't require a real ZK server. */ val MockZkConnect = "127.0.0.1:" + MockZkPort private val transactionStatusKey = "transactionStatus" @@ -273,7 +273,7 @@ object TestUtils extends Logging { } /** - * Create a topic in zookeeper. + * Create a topic in ZooKeeper. * Wait until the leader is elected and the metadata is propagated to all brokers. * Return the leader for each partition. */ @@ -293,7 +293,7 @@ object TestUtils extends Logging { } /** - * Create a topic in zookeeper using a customized replica assignment. + * Create a topic in ZooKeeper using a customized replica assignment. * Wait until the leader is elected and the metadata is propagated to all brokers. * Return the leader for each partition. */ @@ -1172,7 +1172,7 @@ object TestUtils extends Logging { TestUtils.waitUntilTrue(() => servers.forall(server => topicPartitions.forall(tp => server.replicaManager.getPartition(tp).isEmpty)), "Replica manager's should have deleted all of this topic's partitions") - // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper + // ensure that logs from all replicas are deleted if delete topic is marked successful in ZooKeeper assertTrue("Replica logs not deleted after delete topic is complete", servers.forall(server => topicPartitions.forall(tp => server.getLogManager.getLog(tp).isEmpty))) // ensure that topic is removed from all cleaner offsets http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala index 22465ea..36d477c 100755 --- a/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala +++ b/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala @@ -25,19 +25,19 @@ import java.net.InetSocketAddress import kafka.utils.CoreUtils import org.apache.kafka.common.utils.Utils -class EmbeddedZookeeper() { +class EmbeddedZooKeeper() { val snapshotDir = TestUtils.tempDir() val logDir = TestUtils.tempDir() val tickTime = 500 - val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime) + val zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, tickTime) val factory = new NIOServerCnxnFactory() private val addr = new InetSocketAddress("127.0.0.1", TestUtils.RandomPort) factory.configure(addr, 0) - factory.startup(zookeeper) - val port = zookeeper.getClientPort() + factory.startup(zooKeeperServer) + val port = zooKeeperServer.getClientPort() def shutdown() { - CoreUtils.swallow(zookeeper.shutdown()) + CoreUtils.swallow(zooKeeperServer.shutdown()) CoreUtils.swallow(factory.shutdown()) def isDown(): Boolean = { http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala index 07978b9..0cf836d 100644 --- a/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala +++ b/core/src/test/scala/unit/kafka/zk/ZKPathTest.scala @@ -38,7 +38,7 @@ class ZKPathTest extends ZooKeeperTestHarness { try { zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createPersistentPath(path) - fail("Failed to throw ConfigException for missing zookeeper root node") + fail("Failed to throw ConfigException for missing ZooKeeper root node") } catch { case _: ConfigException => } @@ -62,7 +62,7 @@ class ZKPathTest extends ZooKeeperTestHarness { try { zkUtils.zkPath.resetNamespaceCheckedState zkUtils.makeSurePersistentPathExists(path) - fail("Failed to throw ConfigException for missing zookeeper root node") + fail("Failed to throw ConfigException for missing ZooKeeper root node") } catch { case _: ConfigException => } @@ -86,7 +86,7 @@ class ZKPathTest extends ZooKeeperTestHarness { try { zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createEphemeralPathExpectConflict(path, "somedata") - fail("Failed to throw ConfigException for missing zookeeper root node") + fail("Failed to throw ConfigException for missing ZooKeeper root node") } catch { case _: ConfigException => } @@ -111,7 +111,7 @@ class ZKPathTest extends ZooKeeperTestHarness { try { zkUtils.zkPath.resetNamespaceCheckedState zkUtils.createSequentialPersistentPath(path) - fail("Failed to throw ConfigException for missing zookeeper root node") + fail("Failed to throw ConfigException for missing ZooKeeper root node") } catch { case _: ConfigException => } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala index 0a7e631..6bedba3 100755 --- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala +++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala @@ -41,14 +41,14 @@ abstract class ZooKeeperTestHarness extends JUnitSuite with Logging { protected val zkAclsEnabled: Option[Boolean] = None var zkUtils: ZkUtils = null - var zookeeper: EmbeddedZookeeper = null + var zookeeper: EmbeddedZooKeeper = null def zkPort: Int = zookeeper.port def zkConnect: String = s"127.0.0.1:$zkPort" @Before def setUp() { - zookeeper = new EmbeddedZookeeper() + zookeeper = new EmbeddedZooKeeper() zkUtils = ZkUtils(zkConnect, zkSessionTimeout, zkConnectionTimeout, zkAclsEnabled.getOrElse(JaasUtils.isZkSecurityEnabled())) } http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala new file mode 100644 index 0000000..d595221 --- /dev/null +++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.zookeeper + +import java.net.UnknownHostException +import java.nio.charset.StandardCharsets +import java.util.UUID +import java.util.concurrent.{CountDownLatch, TimeUnit} +import javax.security.auth.login.Configuration + +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.security.JaasUtils +import org.apache.zookeeper.KeeperException.Code +import org.apache.zookeeper.{CreateMode, ZooDefs} +import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue} +import org.junit.{After, Test} + +class ZooKeeperClientTest extends ZooKeeperTestHarness { + private val mockPath = "/foo" + + @After + override def tearDown() { + super.tearDown() + System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) + Configuration.setConfiguration(null) + } + + @Test(expected = classOf[UnknownHostException]) + def testUnresolvableConnectString(): Unit = { + new ZooKeeperClient("some.invalid.hostname.foo.bar.local", -1, -1, null) + } + + @Test(expected = classOf[ZooKeeperClientTimeoutException]) + def testConnectionTimeout(): Unit = { + zookeeper.shutdown() + new ZooKeeperClient(zkConnect, zkSessionTimeout, connectionTimeoutMs = 100, null) + } + + @Test + def testConnection(): Unit = { + new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + } + + @Test + def testDeleteNonExistentZNode(): Unit = { + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1)) + assertEquals("Response code should be NONODE", Code.NONODE, deleteResponse.resultCode) + } + + @Test + def testDeleteExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1)) + assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode) + } + + @Test + def testExistsNonExistentZNode(): Unit = { + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, existsResponse.resultCode) + } + + @Test + def testExistsExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val existsResponse = zooKeeperClient.handleRequest(ExistsRequest(mockPath)) + assertEquals("Response code for exists should be OK", Code.OK, existsResponse.resultCode) + } + + @Test + def testGetDataNonExistentZNode(): Unit = { + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, getDataResponse.resultCode) + } + + @Test + def testGetDataExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val data = bytes + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, + CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath)) + assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) + assertArrayEquals("Data for getData should match created znode data", data, getDataResponse.data) + } + + @Test + def testSetDataNonExistentZNode(): Unit = { + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1)) + assertEquals("Response code should be NONODE", Code.NONODE, setDataResponse.resultCode) + } + + @Test + def testSetDataExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val data = bytes + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, data, -1)) + assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode) + val getDataResponse = zooKeeperClient.handleRequest(GetDataRequest(mockPath)) + assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) + assertArrayEquals("Data for getData should match setData's data", data, getDataResponse.data) + } + + @Test + def testGetAclNonExistentZNode(): Unit = { + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, getAclResponse.resultCode) + } + + @Test + def testGetAclExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getAclResponse = zooKeeperClient.handleRequest(GetAclRequest(mockPath)) + assertEquals("Response code for getAcl should be OK", Code.OK, getAclResponse.resultCode) + assertEquals("ACL should be " + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, getAclResponse.acl) + } + + @Test + def testSetAclNonExistentZNode(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val setAclResponse = zooKeeperClient.handleRequest(SetAclRequest(mockPath, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, -1)) + assertEquals("Response code should be NONODE", Code.NONODE, setAclResponse.resultCode) + } + + @Test + def testGetChildrenNonExistentZNode(): Unit = { + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code should be NONODE", Code.NONODE, getChildrenResponse.resultCode) + } + + @Test + def testGetChildrenExistingZNode(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) + assertEquals("getChildren should return no children", Seq.empty[String], getChildrenResponse.children) + } + + @Test + def testGetChildrenExistingZNodeWithChildren(): Unit = { + import scala.collection.JavaConverters._ + val child1 = "child1" + val child2 = "child2" + val child1Path = mockPath + "/" + child1 + val child2Path = mockPath + "/" + child2 + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val createResponseChild1 = zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode) + val createResponseChild2 = zooKeeperClient.handleRequest(CreateRequest(child2Path, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create child2 should be OK", Code.OK, createResponseChild2.resultCode) + + val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) + assertEquals("getChildren should return two children", Seq(child1, child2), getChildrenResponse.children.sorted) + } + + @Test + def testPipelinedGetData(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createRequests = (1 to 3).map(x => CreateRequest("/" + x, (x * 2).toString.getBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + val createResponses = createRequests.map(zooKeeperClient.handleRequest) + createResponses.foreach(createResponse => assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode)) + val getDataRequests = (1 to 3).map(x => GetDataRequest("/" + x)) + val getDataResponses = zooKeeperClient.handleRequests(getDataRequests) + getDataResponses.foreach(getDataResponse => assertEquals("Response code for getData should be OK", Code.OK, + getDataResponse.resultCode)) + getDataResponses.zipWithIndex.foreach { case (getDataResponse, i) => + assertEquals("Response code for getData should be OK", Code.OK, getDataResponse.resultCode) + assertEquals("Data for getData should match", ((i + 1) * 2), Integer.valueOf(new String(getDataResponse.data))) + } + } + + @Test + def testMixedPipeline(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], + ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + val getDataRequest = GetDataRequest(mockPath) + val setDataRequest = SetDataRequest("/nonexistent", Array.empty[Byte], -1) + val responses = zooKeeperClient.handleRequests(Seq(getDataRequest, setDataRequest)) + assertEquals("Response code for getData should be OK", Code.OK, responses.head.resultCode) + assertArrayEquals("Data for getData should be empty", Array.empty[Byte], responses.head.asInstanceOf[GetDataResponse].data) + assertEquals("Response code for setData should be NONODE", Code.NONODE, responses.last.resultCode) + } + + @Test + def testZNodeChangeHandlerForCreation(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChangeHandler = new ZNodeChangeHandler { + override def handleCreation(): Unit = { + znodeChangeHandlerCountDownLatch.countDown() + } + override val path: String = mockPath + } + + zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + val existsRequest = ExistsRequest(mockPath) + val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) + val responses = zooKeeperClient.handleRequests(Seq(existsRequest, createRequest)) + assertEquals("Response code for exists should be NONODE", Code.NONODE, responses.head.resultCode) + assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) + assertTrue("Failed to receive create notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testZNodeChangeHandlerForDeletion(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChangeHandler = new ZNodeChangeHandler { + override def handleDeletion(): Unit = { + znodeChangeHandlerCountDownLatch.countDown() + } + override val path: String = mockPath + } + + zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + val existsRequest = ExistsRequest(mockPath) + val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) + val responses = zooKeeperClient.handleRequests(Seq(createRequest, existsRequest)) + assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) + assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode) + val deleteResponse = zooKeeperClient.handleRequest(DeleteRequest(mockPath, -1)) + assertEquals("Response code for delete should be OK", Code.OK, deleteResponse.resultCode) + assertTrue("Failed to receive delete notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testZNodeChangeHandlerForDataChange(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val znodeChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChangeHandler = new ZNodeChangeHandler { + override def handleDataChange(): Unit = { + znodeChangeHandlerCountDownLatch.countDown() + } + override val path: String = mockPath + } + + zooKeeperClient.registerZNodeChangeHandler(zNodeChangeHandler) + val existsRequest = ExistsRequest(mockPath) + val createRequest = CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT) + val responses = zooKeeperClient.handleRequests(Seq(createRequest, existsRequest)) + assertEquals("Response code for create should be OK", Code.OK, responses.last.resultCode) + assertEquals("Response code for exists should be OK", Code.OK, responses.head.resultCode) + val setDataResponse = zooKeeperClient.handleRequest(SetDataRequest(mockPath, Array.empty[Byte], -1)) + assertEquals("Response code for setData should be OK", Code.OK, setDataResponse.resultCode) + assertTrue("Failed to receive data change notification", znodeChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testZNodeChildChangeHandlerForChildChange(): Unit = { + import scala.collection.JavaConverters._ + val zooKeeperClient = new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, null) + val zNodeChildChangeHandlerCountDownLatch = new CountDownLatch(1) + val zNodeChildChangeHandler = new ZNodeChildChangeHandler { + override def handleChildChange(): Unit = { + zNodeChildChangeHandlerCountDownLatch.countDown() + } + override val path: String = mockPath + } + + val child1 = "child1" + val child1Path = mockPath + "/" + child1 + val createResponse = zooKeeperClient.handleRequest(CreateRequest(mockPath, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create should be OK", Code.OK, createResponse.resultCode) + zooKeeperClient.registerZNodeChildChangeHandler(zNodeChildChangeHandler) + val getChildrenResponse = zooKeeperClient.handleRequest(GetChildrenRequest(mockPath)) + assertEquals("Response code for getChildren should be OK", Code.OK, getChildrenResponse.resultCode) + val createResponseChild1 = zooKeeperClient.handleRequest(CreateRequest(child1Path, Array.empty[Byte], ZooDefs.Ids.OPEN_ACL_UNSAFE.asScala, CreateMode.PERSISTENT)) + assertEquals("Response code for create child1 should be OK", Code.OK, createResponseChild1.resultCode) + assertTrue("Failed to receive child change notification", zNodeChildChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + @Test + def testStateChangeHandlerForAuthFailure(): Unit = { + System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "no-such-file-exists.conf") + val stateChangeHandlerCountDownLatch = new CountDownLatch(1) + val stateChangeHandler = new StateChangeHandler { + override def onAuthFailure(): Unit = { + stateChangeHandlerCountDownLatch.countDown() + } + } + new ZooKeeperClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, stateChangeHandler) + assertTrue("Failed to receive auth failed notification", stateChangeHandlerCountDownLatch.await(5, TimeUnit.SECONDS)) + } + + private def bytes = UUID.randomUUID().toString.getBytes(StandardCharsets.UTF_8) +} http://git-wip-us.apache.org/repos/asf/kafka/blob/ab6f848b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java index 367e489..0145827 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java @@ -20,7 +20,7 @@ import kafka.server.KafkaConfig$; import kafka.server.KafkaServer; import kafka.utils.MockTime; import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; +import kafka.zk.EmbeddedZooKeeper; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.security.JaasUtils; @@ -47,7 +47,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected private static final int TOPIC_CREATION_TIMEOUT = 30000; private static final int TOPIC_DELETION_TIMEOUT = 30000; - private EmbeddedZookeeper zookeeper = null; + private EmbeddedZooKeeper zookeeper = null; private final KafkaEmbedded[] brokers; private ZkUtils zkUtils = null; @@ -84,7 +84,7 @@ public class EmbeddedKafkaCluster extends ExternalResource { public void start() throws IOException, InterruptedException { log.debug("Initiating embedded Kafka cluster startup"); log.debug("Starting a ZooKeeper instance"); - zookeeper = new EmbeddedZookeeper(); + zookeeper = new EmbeddedZooKeeper(); log.debug("ZooKeeper instance is running at {}", zKConnectString()); zkUtils = ZkUtils.apply(
