Repository: kafka Updated Branches: refs/heads/0.9.0 07e214130 -> 2b3f6d204
KAFKA-2869; Host used by Authorizer should be IP address not hostname/IP Author: Ismael Juma <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #567 from ijuma/kafka-2869-host-used-by-authorizer-should-be-ip (cherry picked from commit d0614f97bc1c45734631770c46ab6a79fc8c8547) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b3f6d20 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b3f6d20 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b3f6d20 Branch: refs/heads/0.9.0 Commit: 2b3f6d204a75d1a9ca84b05a6b6a3388f2f16a81 Parents: 07e2141 Author: Ismael Juma <[email protected]> Authored: Fri Nov 20 08:28:20 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Fri Nov 20 08:28:28 2015 -0800 ---------------------------------------------------------------------- .../scala/kafka/network/RequestChannel.scala | 5 ++- .../main/scala/kafka/network/SocketServer.scala | 7 ++-- .../security/auth/SimpleAclAuthorizer.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 12 +++--- .../security/auth/SimpleAclAuthorizerTest.scala | 41 ++++++++++---------- 5 files changed, 36 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 9ea4079..4044f62 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -17,6 +17,7 @@ package kafka.network +import java.net.InetAddress import java.nio.ByteBuffer import java.security.Principal import java.util.concurrent._ @@ -35,7 +36,7 @@ import org.apache.log4j.Logger object RequestChannel extends Logging { - val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, ""), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) + val AllDone = new Request(processor = 1, connectionId = "2", new Session(KafkaPrincipal.ANONYMOUS, InetAddress.getLocalHost()), buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) def getShutdownReceive() = { val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) @@ -46,7 +47,7 @@ object RequestChannel extends Logging { byteBuffer } - case class Session(principal: KafkaPrincipal, host: String) + case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) case class Request(processor: Int, connectionId: String, session: Session, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) { // These need to be volatile because the readers are in the network thread and the writers are in the request http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index cb38153..69a9569 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -421,7 +421,8 @@ private[kafka] class Processor(val id: Int, selector.completedReceives.asScala.foreach { receive => try { val channel = selector.channel(receive.source) - val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal().getName), channel.socketDescription) + val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), + channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { @@ -434,11 +435,11 @@ private[kafka] class Processor(val id: Int, } selector.completedSends.asScala.foreach { send => - val resp = inflightResponses.remove(send.destination()).getOrElse { + val resp = inflightResponses.remove(send.destination).getOrElse { throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`") } resp.request.updateRequestMetrics() - selector.unmute(send.destination()) + selector.unmute(send.destination) } selector.disconnected.asScala.foreach { connectionId => http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index 76645da..cae8f2a 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -110,7 +110,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { override def authorize(session: Session, operation: Operation, resource: Resource): Boolean = { val principal: KafkaPrincipal = session.principal - val host = session.host + val host = session.clientAddress.getHostAddress val acls = getAcls(resource) ++ getAcls(new Resource(resource.resourceType, Resource.WildCardResource)) //check if there is any Deny acl match that would disallow this operation. @@ -286,4 +286,4 @@ class SimpleAclAuthorizer extends Authorizer with Logging { } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bb50e40..ade879b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -791,12 +791,12 @@ class KafkaApis(val requestChannel: RequestChannel, val protocols = joinGroupRequest.groupProtocols().map(protocol => (protocol.name, Utils.toArray(protocol.metadata))).toList coordinator.handleJoinGroup( - joinGroupRequest.groupId(), - joinGroupRequest.memberId(), - request.header.clientId(), - request.session.host, - joinGroupRequest.sessionTimeout(), - joinGroupRequest.protocolType(), + joinGroupRequest.groupId, + joinGroupRequest.memberId, + request.header.clientId, + request.session.clientAddress.toString, + joinGroupRequest.sessionTimeout, + joinGroupRequest.protocolType, protocols, sendResponseCallback) } http://git-wip-us.apache.org/repos/asf/kafka/blob/2b3f6d20/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala index 76a768a..a4f61df 100644 --- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala +++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala @@ -16,12 +16,13 @@ */ package kafka.security.auth +import java.net.InetAddress import java.util.UUID import kafka.network.RequestChannel.Session import kafka.security.auth.Acl.WildCardHost import kafka.server.KafkaConfig -import kafka.utils.{ZkUtils, TestUtils} +import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Assert._ @@ -29,10 +30,10 @@ import org.junit.{Before, Test} class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { - var simpleAclAuthorizer = new SimpleAclAuthorizer + val simpleAclAuthorizer = new SimpleAclAuthorizer val testPrincipal = Acl.WildCardPrincipal - val testHostName = "test.host.com" - var session = new Session(testPrincipal, testHostName) + val testHostName = InetAddress.getByName("192.168.0.1") + val session = new Session(testPrincipal, testHostName) var resource: Resource = null val superUsers = "User:superuser1; User:superuser2" val username = "alice" @@ -55,18 +56,18 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob") val user3 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "batman") - val host1 = "host1" - val host2 = "host2" + val host1 = InetAddress.getByName("192.168.1.1") + val host2 = InetAddress.getByName("192.168.1.2") //user1 has READ access from host1 and host2. - val acl1 = new Acl(user1, Allow, host1, Read) - val acl2 = new Acl(user1, Allow, host2, Read) + val acl1 = new Acl(user1, Allow, host1.getHostAddress, Read) + val acl2 = new Acl(user1, Allow, host2.getHostAddress, Read) //user1 does not have READ access from host1. - val acl3 = new Acl(user1, Deny, host1, Read) + val acl3 = new Acl(user1, Deny, host1.getHostAddress, Read) //user1 has Write access from host1 only. - val acl4 = new Acl(user1, Allow, host1, Write) + val acl4 = new Acl(user1, Allow, host1.getHostAddress, Write) //user1 has DESCRIBE access from all hosts. val acl5 = new Acl(user1, Allow, WildCardHost, Describe) @@ -105,11 +106,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { @Test def testDenyTakesPrecedence() { val user = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) - val host = "random-host" + val host = InetAddress.getByName("192.168.2.1") val session = new Session(user, host) val allowAll = Acl.AllowAllAcl - val denyAcl = new Acl(user, Deny, host, All) + val denyAcl = new Acl(user, Deny, host.getHostAddress, All) val acls = Set[Acl](allowAll, denyAcl) changeAclAndVerify(Set.empty[Acl], acls, Set.empty[Acl]) @@ -123,7 +124,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { changeAclAndVerify(Set.empty[Acl], Set[Acl](allowAllAcl), Set.empty[Acl]) - val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), "random.host") + val session = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "random"), InetAddress.getByName("192.0.4.4")) assertTrue("allow all acl should allow access to all.", simpleAclAuthorizer.authorize(session, Read, resource)) } @@ -133,8 +134,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { changeAclAndVerify(Set.empty[Acl], Set[Acl](denyAllAcl), Set.empty[Acl]) - val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), "random.host") - val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), "random.host") + val session1 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser1"), InetAddress.getByName("192.0.4.4")) + val session2 = new Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "superuser2"), InetAddress.getByName("192.0.4.4")) assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session1, Read, resource)) assertTrue("superuser always has access, no matter what acls.", simpleAclAuthorizer.authorize(session2, Read, resource)) @@ -145,8 +146,8 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertFalse("when acls = [], authorizer should fail close.", simpleAclAuthorizer.authorize(session, Read, resource)) val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username) - val host1 = "host1" - val readAcl = new Acl(user1, Allow, host1, Read) + val host1 = InetAddress.getByName("192.168.3.1") + val readAcl = new Acl(user1, Allow, host1.getHostAddress, Read) val wildCardResource = new Resource(resource.resourceType, Resource.WildCardResource) val acls = changeAclAndVerify(Set.empty[Acl], Set[Acl](readAcl), Set.empty[Acl], wildCardResource) @@ -155,11 +156,11 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { assertTrue("User1 should have Read access from host1", simpleAclAuthorizer.authorize(host1Session, Read, resource)) //allow Write to specific topic. - val writeAcl = new Acl(user1, Allow, host1, Write) + val writeAcl = new Acl(user1, Allow, host1.getHostAddress, Write) changeAclAndVerify(Set.empty[Acl], Set[Acl](writeAcl), Set.empty[Acl]) //deny Write to wild card topic. - val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1, Write) + val denyWriteOnWildCardResourceAcl = new Acl(user1, Deny, host1.getHostAddress, Write) changeAclAndVerify(acls, Set[Acl](denyWriteOnWildCardResourceAcl), Set.empty[Acl], wildCardResource) assertFalse("User1 should not have Write access from host1", simpleAclAuthorizer.authorize(host1Session, Write, resource)) @@ -265,4 +266,4 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness { acls } -} \ No newline at end of file +}
