Repository: kafka Updated Branches: refs/heads/trunk c30ee50d8 -> c39e79bb5
http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e363e27..c8ca2a3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -10,15 +10,14 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package integration.kafka.api +package kafka.api import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.nio.ByteBuffer import java.util.concurrent.ExecutionException -import java.util.{ArrayList, Properties} +import java.util.{ArrayList, Collections, Properties} -import kafka.api.RequestKeys import kafka.cluster.EndPoint import kafka.common.{ErrorMapping, TopicAndPartition} import kafka.coordinator.GroupCoordinator @@ -26,15 +25,13 @@ import kafka.integration.KafkaServerTestHarness import kafka.security.auth._ import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.clients.consumer.{OffsetAndMetadata, Consumer, ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{ApiException, AuthorizationException, TimeoutException} -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} -import org.apache.kafka.common.requests.FetchRequest.PartitionData -import org.apache.kafka.common.requests.UpdateMetadataRequest.PartitionState +import org.apache.kafka.common.errors._ +import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.{TopicPartition, requests} import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} @@ -76,42 +73,40 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { val endPoint = new EndPoint("localhost", 0, SecurityProtocol.PLAINTEXT) - var RequestKeyToRequest: mutable.LinkedHashMap[Short, AbstractRequest] = null - val RequestKeyToResponseDeserializer: Map[Short, Class[_ <: Any]] = - Map(RequestKeys.MetadataKey -> classOf[MetadataResponse], - RequestKeys.ProduceKey -> classOf[ProduceResponse], - RequestKeys.FetchKey -> classOf[FetchResponse], - RequestKeys.OffsetsKey -> classOf[ListOffsetResponse], - RequestKeys.OffsetCommitKey -> classOf[OffsetCommitResponse], - RequestKeys.OffsetFetchKey -> classOf[OffsetFetchResponse], - RequestKeys.GroupCoordinatorKey -> classOf[GroupCoordinatorResponse], - RequestKeys.UpdateMetadataKey -> classOf[UpdateMetadataResponse], + Map(RequestKeys.MetadataKey -> classOf[requests.MetadataResponse], + RequestKeys.ProduceKey -> classOf[requests.ProduceResponse], + RequestKeys.FetchKey -> classOf[requests.FetchResponse], + RequestKeys.OffsetsKey -> classOf[requests.ListOffsetResponse], + RequestKeys.OffsetCommitKey -> classOf[requests.OffsetCommitResponse], + RequestKeys.OffsetFetchKey -> classOf[requests.OffsetFetchResponse], + RequestKeys.GroupCoordinatorKey -> classOf[requests.GroupCoordinatorResponse], + RequestKeys.UpdateMetadataKey -> classOf[requests.UpdateMetadataResponse], RequestKeys.JoinGroupKey -> classOf[JoinGroupResponse], RequestKeys.SyncGroupKey -> classOf[SyncGroupResponse], RequestKeys.HeartbeatKey -> classOf[HeartbeatResponse], RequestKeys.LeaveGroupKey -> classOf[LeaveGroupResponse], - RequestKeys.LeaderAndIsrKey -> classOf[LeaderAndIsrResponse], - RequestKeys.StopReplicaKey -> classOf[StopReplicaResponse], - RequestKeys.ControlledShutdownKey -> classOf[ControlledShutdownResponse] + RequestKeys.LeaderAndIsrKey -> classOf[requests.LeaderAndIsrResponse], + RequestKeys.StopReplicaKey -> classOf[requests.StopReplicaResponse], + RequestKeys.ControlledShutdownKey -> classOf[requests.ControlledShutdownResponse] ) val RequestKeyToErrorCode = Map[Short, (Nothing) => Short]( - RequestKeys.MetadataKey -> ((resp: MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()), - RequestKeys.ProduceKey -> ((resp: ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.FetchKey -> ((resp: FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.OffsetsKey -> ((resp: ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.OffsetCommitKey -> ((resp: OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2), - RequestKeys.OffsetFetchKey -> ((resp: OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), - RequestKeys.GroupCoordinatorKey -> ((resp: GroupCoordinatorResponse) => resp.errorCode()), - RequestKeys.UpdateMetadataKey -> ((resp: UpdateMetadataResponse) => resp.errorCode()), + RequestKeys.MetadataKey -> ((resp: requests.MetadataResponse) => resp.errors().asScala.find(_._1 == topic).getOrElse(("test", Errors.NONE))._2.code()), + RequestKeys.ProduceKey -> ((resp: requests.ProduceResponse) => resp.responses().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.FetchKey -> ((resp: requests.FetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.OffsetsKey -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.OffsetCommitKey -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2), + RequestKeys.OffsetFetchKey -> ((resp: requests.OffsetFetchResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.errorCode), + RequestKeys.GroupCoordinatorKey -> ((resp: requests.GroupCoordinatorResponse) => resp.errorCode()), + RequestKeys.UpdateMetadataKey -> ((resp: requests.UpdateMetadataResponse) => resp.errorCode()), RequestKeys.JoinGroupKey -> ((resp: JoinGroupResponse) => resp.errorCode()), RequestKeys.SyncGroupKey -> ((resp: SyncGroupResponse) => resp.errorCode()), RequestKeys.HeartbeatKey -> ((resp: HeartbeatResponse) => resp.errorCode()), RequestKeys.LeaveGroupKey -> ((resp: LeaveGroupResponse) => resp.errorCode()), - RequestKeys.LeaderAndIsrKey -> ((resp: LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2), - RequestKeys.StopReplicaKey -> ((resp: StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2), - RequestKeys.ControlledShutdownKey -> ((resp: ControlledShutdownResponse) => resp.errorCode()) + RequestKeys.LeaderAndIsrKey -> ((resp: requests.LeaderAndIsrResponse) => resp.responses().asScala.find(_._1 == tp).get._2), + RequestKeys.StopReplicaKey -> ((resp: requests.StopReplicaResponse) => resp.responses().asScala.find(_._1 == tp).get._2), + RequestKeys.ControlledShutdownKey -> ((resp: requests.ControlledShutdownResponse) => resp.errorCode()) ) val RequestKeysToAcls = Map[Short, Map[Resource, Set[Acl]]]( @@ -155,41 +150,6 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { servers.head.consumerCoordinator.offsetsTopicConfigs) // create the test topic with all the brokers as replicas TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers) - - val joinReq = new JoinGroupRequest(group, 30000, JoinGroupRequest.UNKNOWN_MEMBER_ID, "consumer", - List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) - - //we have to get a join call so the group is created and we get back a memberId - addAndVerifyAcls(GroupReadAcl(groupResource), groupResource) - val socket = new Socket("localhost", servers.head.boundPort()) - val joinResponse = sendRequestAndVerifyResponseErrorCode(socket, RequestKeys.JoinGroupKey, joinReq, ErrorMapping.NoError).asInstanceOf[JoinGroupResponse] - val memberId = joinResponse.memberId() - - //remove group acls - removeAndVerifyAcls(GroupReadAcl(groupResource), groupResource) - - RequestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest]( - RequestKeys.MetadataKey -> new MetadataRequest(List(topic).asJava), - RequestKeys.ProduceKey -> new ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava), - RequestKeys.FetchKey -> new FetchRequest(5000, 100, Map(tp -> new PartitionData(0, 100)).asJava), - RequestKeys.OffsetsKey -> new ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava), - RequestKeys.OffsetFetchKey -> new OffsetFetchRequest(group, List(tp).asJava), - RequestKeys.GroupCoordinatorKey -> new GroupCoordinatorRequest(group), - RequestKeys.UpdateMetadataKey -> new UpdateMetadataRequest(brokerId, Int.MaxValue, - Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, - Set(new UpdateMetadataRequest.Broker(brokerId, Map(SecurityProtocol.PLAINTEXT -> new UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava), - RequestKeys.JoinGroupKey -> new JoinGroupRequest(group, 30000, memberId, "consumer", - List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava), - RequestKeys.SyncGroupKey -> new SyncGroupRequest(group, 1, memberId, Map(memberId -> ByteBuffer.wrap("test".getBytes())).asJava), - RequestKeys.OffsetCommitKey -> new OffsetCommitRequest(group, 1, memberId, 1000, Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava), - RequestKeys.HeartbeatKey -> new HeartbeatRequest(group, 1, memberId), - RequestKeys.LeaveGroupKey -> new LeaveGroupRequest(group, memberId), - RequestKeys.LeaderAndIsrKey -> new LeaderAndIsrRequest(brokerId, Int.MaxValue, - Map(tp -> new LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, - Set(new LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava), - RequestKeys.StopReplicaKey -> new StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava), - RequestKeys.ControlledShutdownKey -> new ControlledShutdownRequest(brokerId) - ) } @After @@ -198,108 +158,346 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { super.tearDown() } + private def createMetadataRequest = { + new requests.MetadataRequest(List(topic).asJava) + } + + private def createProduceRequest = { + new requests.ProduceRequest(1, 5000, collection.mutable.Map(tp -> ByteBuffer.wrap("test".getBytes)).asJava) + } + + private def createFetchRequest = { + new requests.FetchRequest(5000, 100, Map(tp -> new requests.FetchRequest.PartitionData(0, 100)).asJava) + } + + private def createListOffsetsRequest = { + new requests.ListOffsetRequest(Map(tp -> new ListOffsetRequest.PartitionData(0, 100)).asJava) + } + + private def createOffsetFetchRequest = { + new requests.OffsetFetchRequest(group, List(tp).asJava) + } + + private def createGroupCoordinatorRequest = { + new requests.GroupCoordinatorRequest(group) + } + + private def createUpdateMetadataRequest = { + val partitionState = Map(tp -> new requests.UpdateMetadataRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava + val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, + Map(SecurityProtocol.PLAINTEXT -> new requests.UpdateMetadataRequest.EndPoint("localhost", 0)).asJava)).asJava + new requests.UpdateMetadataRequest(brokerId, Int.MaxValue, partitionState, brokers) + } + + private def createJoinGroupRequest = { + new JoinGroupRequest(group, 30000, "", "consumer", + List( new JoinGroupRequest.GroupProtocol("consumer-range",ByteBuffer.wrap("test".getBytes()))).asJava) + } + + private def createSyncGroupRequest = { + new SyncGroupRequest(group, 1, "", Map[String, ByteBuffer]().asJava) + } + + private def createOffsetCommitRequest = { + new requests.OffsetCommitRequest(group, 1, "", 1000, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava) + } + + private def createHeartbeatRequest = { + new HeartbeatRequest(group, 1, "") + } + + private def createLeaveGroupRequest = { + new LeaveGroupRequest(group, "") + } + + private def createLeaderAndIsrRequest = { + new requests.LeaderAndIsrRequest(brokerId, Int.MaxValue, + Map(tp -> new requests.LeaderAndIsrRequest.PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Set(brokerId).asJava)).asJava, + Set(new requests.LeaderAndIsrRequest.EndPoint(brokerId,"localhost", 0)).asJava) + } + + private def createStopReplicaRequest = { + new requests.StopReplicaRequest(brokerId, Int.MaxValue, true, Set(tp).asJava) + } + + private def createControlledShutdownRequest = { + new requests.ControlledShutdownRequest(brokerId) + } + @Test def testAuthorization() { + val requestKeyToRequest = mutable.LinkedHashMap[Short, AbstractRequest]( + RequestKeys.MetadataKey -> createMetadataRequest, + RequestKeys.ProduceKey -> createProduceRequest, + RequestKeys.FetchKey -> createFetchRequest, + RequestKeys.OffsetsKey -> createListOffsetsRequest, + RequestKeys.OffsetFetchKey -> createOffsetFetchRequest, + RequestKeys.GroupCoordinatorKey -> createGroupCoordinatorRequest, + RequestKeys.UpdateMetadataKey -> createUpdateMetadataRequest, + RequestKeys.JoinGroupKey -> createJoinGroupRequest, + RequestKeys.SyncGroupKey -> createSyncGroupRequest, + RequestKeys.OffsetCommitKey -> createOffsetCommitRequest, + RequestKeys.HeartbeatKey -> createHeartbeatRequest, + RequestKeys.LeaveGroupKey -> createLeaveGroupRequest, + RequestKeys.LeaderAndIsrKey -> createLeaderAndIsrRequest, + RequestKeys.StopReplicaKey -> createStopReplicaRequest, + RequestKeys.ControlledShutdownKey -> createControlledShutdownRequest + ) + val socket = new Socket("localhost", servers.head.boundPort()) - for ((key, request) <- RequestKeyToRequest) { + for ((key, request) <- requestKeyToRequest) { removeAllAcls - - sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.AuthorizationCode) - + val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet + sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = false) for ((resource, acls) <- RequestKeysToAcls(key)) addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseErrorCode(socket, key, request, resources, isAuthorized = true) + } + } - sendRequestAndVerifyResponseErrorCode(socket, key, request, ErrorMapping.NoError) + @Test + def testProduceWithNoTopicAccess() { + try { + sendRecords(numRecords, tp) + fail("sendRecords should have thrown") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } } - @Test - def testProduceNeedsAuthorization() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) - try { - sendRecords(numRecords, tp) - Assert.fail("should have thrown exception") - } catch { - case e: ApiException => Assert.assertEquals(Errors.AUTHORIZATION_FAILED.exception().getMessage, e.getMessage) - } + @Test + def testProduceWithTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + try { + sendRecords(numRecords, tp) + fail("sendRecords should have thrown") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } + } - @Test - def testOnlyWritePermissionAllowsWritingToProducer() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + @Test + def testProduceWithTopicRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + try { sendRecords(numRecords, tp) + fail("sendRecords should have thrown") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } + } - @Test - def testCreatePermissionNeededForWritingToNonExistentTopic() { - val newTopic = "newTopic" - val topicPartition = new TopicPartition(newTopic, 0) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) - try { - sendRecords(numRecords, topicPartition) - Assert.fail("should have thrown exception") - } catch { - case e: TimeoutException => - //TODO Need to update the producer so it actually throws the server side of exception. - case e: Exception => Assert.fail(s"Only timeout exception should be thrown but $e thrown") - } + @Test + def testProduceWithTopicWrite() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(numRecords, tp) + } - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create), - new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)), Resource.ClusterResource) + @Test + def testCreatePermissionNeededForWritingToNonExistentTopic() { + val newTopic = "newTopic" + val topicPartition = new TopicPartition(newTopic, 0) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, newTopic)) + try { sendRecords(numRecords, topicPartition) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } - @Test - def testConsumerNeedsAuthorization() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - //TODO: Ideally we would want to test that when consumerGroup permission is not present we still get an AuthorizationException - //but the consumer fetcher currently waits forever for the consumer metadata to become available. - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - sendRecords(1, tp) - try { - this.consumers.head.assign(List(tp).asJava) - consumeRecords(this.consumers.head) - Assert.fail("should have thrown exception") - } catch { - case e: AuthorizationException => Assert.assertEquals("Not authorized to read from topic-0", e.getMessage) - } + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + sendRecords(numRecords, topicPartition) + } + + @Test(expected = classOf[AuthorizationException]) + def testConsumeWithNoAccess(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + } + + @Test + def testConsumeWithNoGroupAccess(): Unit = { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + try { + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: GroupAuthorizationException => assertEquals(group, e.groupId()) + } + } + + @Test + def testConsumeWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + try { + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + } + } + + @Test + def testConsumeWithTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + try { + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); } + } - @Test - def testAllowingReadOnTopicAndGroupAllowsReading() { - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - sendRecords(1, tp) + @Test + def testConsumeWithTopicWrite() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + try { this.consumers.head.assign(List(tp).asJava) consumeRecords(this.consumers.head) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + } + } + + @Test + def testConsumeWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.assign(List(tp).asJava) + consumeRecords(this.consumers.head) + } + + @Test + def testCreatePermissionNeededToReadFromNonExistentTopic() { + val newTopic = "newTopic" + val topicPartition = new TopicPartition(newTopic, 0) + val newTopicResource = new Resource(Topic, newTopic) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource) + addAndVerifyAcls(GroupReadAcl(groupResource), groupResource) + addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource) + try { + this.consumers(0).assign(List(topicPartition).asJava) + consumeRecords(this.consumers(0)) + Assert.fail("should have thrown exception") + } catch { + case e: TopicAuthorizationException => + assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()); } -// TODO: The following test goes into an infinite loop as consumer waits for consumer metadata to be propogated for ever. -// @Test -// def testCreatePermissionNeededToReadFromNonExistentTopic() { -// val newTopic = "newTopic" -// val topicPartition = new TopicPartition(newTopic, 0) -// val newTopicResource = new Resource(Topic, newTopic) -// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), newTopicResource) -// addAndVerifyAcls(GroupReadAcl(groupResource), groupResource) -// addAndVerifyAcls(ClusterAcl(Resource.ClusterResource), Resource.ClusterResource) -// try { -// this.consumers(0).assign(List(topicPartition).asJava) -// consumeRecords(this.consumers(0)) -// Assert.fail("should have thrown exception") -// } catch { -// //checking for the message and type to ensure whenever these things are fixed on client side the test starts failing. -// case e: ApiException => Assert.assertEquals(e.getMessage, "Request is not authorized.") -// } -// -// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) -// addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) -// -// sendRecords(numRecords, topicPartition) -// consumeRecords(this.consumers(0)) -// } + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)), Resource.ClusterResource) + + sendRecords(numRecords, topicPartition) + consumeRecords(this.consumers(0), topic = newTopic, part = 0) + } + + @Test(expected = classOf[AuthorizationException]) + def testCommitWithNoAccess() { + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testCommitWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testCommitWithTopicWrite() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testCommitWithTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[GroupAuthorizationException]) + def testCommitWithNoGroupAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test + def testCommitWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) + } + + @Test(expected = classOf[AuthorizationException]) + def testOffsetFetchWithNoAccess() { + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test(expected = classOf[GroupAuthorizationException]) + def testOffsetFetchWithNoGroupAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test(expected = classOf[TopicAuthorizationException]) + def testOffsetFetchWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test + def testOffsetFetchTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } + + @Test + def testOffsetFetchWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + this.consumers.head.assign(List(tp).asJava) + this.consumers.head.position(tp) + } def removeAllAcls() = { servers.head.apis.authorizer.get.getAcls().keys.foreach { resource => @@ -308,7 +506,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { } } - def sendRequestAndVerifyResponseErrorCode(socket: Socket, key: Short, request: AbstractRequest, expectedErrorCode: Short): AbstractRequestResponse = { + def sendRequestAndVerifyResponseErrorCode(socket: Socket, + key: Short, + request: AbstractRequest, + resources: Set[ResourceType], + isAuthorized: Boolean): AbstractRequestResponse = { val header = new RequestHeader(key, "client", 1) val body = request.toStruct @@ -323,7 +525,14 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { ResponseHeader.parse(resp) val response = RequestKeyToResponseDeserializer(key).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse] - Assert.assertEquals(s"$key failed", expectedErrorCode, RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response)) + val errorCode = RequestKeyToErrorCode(key).asInstanceOf[(AbstractRequestResponse) => Short](response) + + val possibleErrorCodes = resources.map(_.errorCode) + if (isAuthorized) + assertFalse(s"${ApiKeys.forId(key)} should be allowed", possibleErrorCodes.contains(errorCode)) + else + assertTrue(s"${ApiKeys.forId(key)} should be forbidden", possibleErrorCodes.contains(errorCode)) + response } @@ -364,8 +573,11 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { } - private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int = 1, startingOffset: Int = - 0) { + private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], + numRecords: Int = 1, + startingOffset: Int = 0, + topic: String = topic, + part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 var iters = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java index d43fc53..2a5ca9b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java @@ -25,6 +25,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,7 +43,7 @@ public class DefaultPartitionGrouperTest { new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos); + private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); @Test public void testGrouping() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c39e79bb/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d8141d1..909df13 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -77,7 +77,7 @@ public class StreamThreadTest { new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos); + private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"));
