Repository: kafka Updated Branches: refs/heads/0.11.0 b1313935f -> bc6a3bc6f
MINOR: A few cleanups in KafkaApis and TransactionMarkerChannelManager Author: Jason Gustafson <[email protected]> Reviewers: Colin P. Mccabe <[email protected]>, Ismael Juma <[email protected]> Closes #3171 from hachikuji/minor-txn-channel-cleanups Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/63605779 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/63605779 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/63605779 Branch: refs/heads/0.11.0 Commit: 63605779ef67fb39487cf0487c8ac4caa8d39cbc Parents: b131393 Author: Jason Gustafson <[email protected]> Authored: Sat Jun 17 02:03:25 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 17 14:04:32 2017 +0100 ---------------------------------------------------------------------- .../common/requests/DeleteAclsResponse.java | 8 + .../kafka/common/InterBrokerSendThread.scala | 29 +-- .../TransactionMarkerChannelManager.scala | 22 +-- .../scala/kafka/security/SecurityUtils.scala | 48 +++++ .../kafka/security/auth/PermissionType.scala | 5 - .../src/main/scala/kafka/server/KafkaApis.scala | 187 ++++++------------- .../common/InterBrokerSendThreadTest.scala | 16 +- .../TransactionMarkerChannelManagerTest.scala | 20 +- 8 files changed, 151 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java index 796e200..94cd6aa 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -49,6 +49,10 @@ public class DeleteAclsResponse extends AbstractResponse { this.acl = acl; } + public AclDeletionResult(AclBinding acl) { + this(null, acl); + } + public ApiException exception() { return exception; } @@ -72,6 +76,10 @@ public class DeleteAclsResponse extends AbstractResponse { this.deletions = deletions; } + public AclFilterResponse(Collection<AclDeletionResult> deletions) { + this(null, deletions); + } + public Throwable throwable() { return throwable; } http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/common/InterBrokerSendThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala index 886e41c..06158b2 100644 --- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala +++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala @@ -27,24 +27,27 @@ import org.apache.kafka.common.utils.Time /** * Class for inter-broker send thread that utilize a non-blocking network client. */ -class InterBrokerSendThread(name: String, - networkClient: NetworkClient, - requestGenerator: () => Iterable[RequestAndCompletionHandler], - time: Time, - isInterruptible: Boolean = true) +abstract class InterBrokerSendThread(name: String, + networkClient: NetworkClient, + time: Time, + isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { - // visible for testing - def generateRequests(): Iterable[RequestAndCompletionHandler] = requestGenerator() + def generateRequests(): Iterable[RequestAndCompletionHandler] + + override def shutdown(): Unit = { + initiateShutdown() + // wake up the thread in case it is blocked inside poll + networkClient.wakeup() + awaitShutdown() + } override def doWork() { val now = time.milliseconds() var pollTimeout = Long.MaxValue try { - val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator() - - for (request: RequestAndCompletionHandler <- requestsToSend) { + for (request: RequestAndCompletionHandler <- generateRequests()) { val destination = Integer.toString(request.destination.id()) val completionHandler = request.handler val clientRequest = networkClient.newClientRequest(destination, @@ -79,6 +82,10 @@ class InterBrokerSendThread(name: String, throw new FatalExitError() } } + + def wakeup(): Unit = networkClient.wakeup() + } -case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], handler: RequestCompletionHandler) \ No newline at end of file +case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], + handler: RequestCompletionHandler) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index 22f01c1..9c3ffd9 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -122,16 +122,12 @@ class TransactionMarkerChannelManager(config: KafkaConfig, networkClient: NetworkClient, txnStateManager: TransactionStateManager, txnMarkerPurgatory: DelayedOperationPurgatory[DelayedTxnMarker], - time: Time) extends Logging with KafkaMetricsGroup { + time: Time) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, time) with Logging with KafkaMetricsGroup { this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: " private val interBrokerListenerName: ListenerName = config.interBrokerListenerName - private val txnMarkerSendThread: InterBrokerSendThread = { - new InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, drainQueuedTransactionMarkers, time) - } - private val markersQueuePerBroker: concurrent.Map[Int, TxnMarkerQueue] = new ConcurrentHashMap[Int, TxnMarkerQueue]().asScala private val markersQueueForUnknownBroker = new TxnMarkerQueue(Node.noNode) @@ -152,15 +148,10 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } ) - def start(): Unit = { - txnMarkerSendThread.start() - } + override def generateRequests() = drainQueuedTransactionMarkers() - def shutdown(): Unit = { - txnMarkerSendThread.initiateShutdown() - // wake up the thread in case it is blocked inside poll - networkClient.wakeup() - txnMarkerSendThread.awaitShutdown() + override def shutdown(): Unit = { + super.shutdown() txnMarkerPurgatory.shutdown() markersQueuePerBroker.clear() } @@ -173,9 +164,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig, // visible for testing private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker - // visible for testing - private[transaction] def senderThread = txnMarkerSendThread - private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry) { val brokerId = broker.id @@ -369,7 +357,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } } - networkClient.wakeup() + wakeup() } def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/SecurityUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala new file mode 100644 index 0000000..bbfc42c --- /dev/null +++ b/core/src/main/scala/kafka/security/SecurityUtils.scala @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security + +import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType} +import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter} +import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.security.auth.KafkaPrincipal + +import scala.util.Try + + +object SecurityUtils { + + def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] = { + for { + resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType)) + principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal)) + operation <- Try(Operation.fromJava(filter.entryFilter.operation)) + permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType)) + resource = Resource(resourceType, filter.resourceFilter.name) + acl = Acl(principal, permissionType, filter.entryFilter.host, operation) + } yield (resource, acl) + } + + def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = { + val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name) + val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString, + acl.operation.toJava, acl.permissionType.toJava) + new AclBinding(adminResource, entry) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/security/auth/PermissionType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala index 686c60b..c603351 100644 --- a/core/src/main/scala/kafka/security/auth/PermissionType.scala +++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala @@ -21,11 +21,6 @@ import org.apache.kafka.common.acl.AclPermissionType import scala.util.{Failure, Success, Try} -/** - * PermissionType. - */ - - sealed trait PermissionType extends BaseEnum { val toJava: AclPermissionType } http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 27eb816..337c740 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -34,6 +34,7 @@ import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} import kafka.log.{Log, LogManager, TimestampOffset} import kafka.network.{RequestChannel, RequestOrResponseSend} +import kafka.security.SecurityUtils import kafka.security.auth._ import kafka.utils.{CoreUtils, Exit, Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors._ @@ -50,13 +51,12 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.requests.SaslHandshakeResponse -import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType} -import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding} import scala.collection._ import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} /** @@ -1782,70 +1782,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - /** - * Convert an ACL binding filter to a Scala object. - * All ACL and resource fields must be specified (no UNKNOWN, ANY, or null fields are allowed.) - * - * @param filter The binding filter as a Java object. - * @return The binding filter as a scala object, or an exception if there was an error - * converting the Java object. - */ - def toScala(filter: AclBindingFilter) : Try[(Resource, Acl)] = { - filter.resourceFilter().resourceType() match { - case AdminResourceType.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN resource type")) - case AdminResourceType.ANY => return Failure(new InvalidRequestException("Invalid ANY resource type")) - case _ => {} - } - val resourceType: ResourceType = try { - ResourceType.fromJava(filter.resourceFilter.resourceType) - } catch { - case throwable: Throwable => return Failure(new InvalidRequestException("Invalid resource type")) - } - val principal: KafkaPrincipal = try { - KafkaPrincipal.fromString(filter.entryFilter.principal) - } catch { - case throwable: Throwable => return Failure(new InvalidRequestException("Invalid principal")) - } - filter.entryFilter().operation() match { - case AclOperation.UNKNOWN => return Failure(new InvalidRequestException("Invalid UNKNOWN operation type")) - case AclOperation.ANY => return Failure(new InvalidRequestException("Invalid ANY operation type")) - case _ => {} - } - val operation: Operation = try { - Operation.fromJava(filter.entryFilter.operation) - } catch { - case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage)) - } - filter.entryFilter().permissionType() match { - case AclPermissionType.UNKNOWN => new InvalidRequestException("Invalid UNKNOWN permission type") - case AclPermissionType.ANY => new InvalidRequestException("Invalid ANY permission type") - case _ => {} - } - val permissionType: PermissionType = try { - PermissionType.fromJava(filter.entryFilter.permissionType) - } catch { - case throwable: Throwable => return Failure(new InvalidRequestException(throwable.getMessage)) - } - return Success((Resource(resourceType, filter.resourceFilter().name()), Acl(principal, permissionType, - filter.entryFilter().host(), operation))) - } - - /** - * Convert a Scala ACL binding to a Java object. - * - * @param acl The binding as a Scala object. - * @return The binding as a Java object. - */ - def toJava(acl: (Resource, Acl)) : AclBinding = { - acl match { - case (resource, acl) => - val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name) - val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString, - acl.operation.toJava, acl.permissionType.toJava) - return new AclBinding(adminResource, entry) - } - } - def handleCreateAcls(request: RequestChannel.Request): Unit = { authorizeClusterAlter(request) val createAclsRequest = request.body[CreateAclsRequest] @@ -1855,11 +1791,9 @@ class KafkaApis(val requestChannel: RequestChannel, createAclsRequest.getErrorResponse(requestThrottleMs, new SecurityDisabledException("No Authorizer is configured on the broker."))) case Some(auth) => - val errors = mutable.HashMap[Int, Throwable]() - for (i <- 0 until createAclsRequest.aclCreations.size) { - val result = toScala(createAclsRequest.aclCreations.get(i).acl.toFilter) - result match { - case Failure(throwable) => errors.put(i, throwable) + val aclCreationResults = createAclsRequest.aclCreations.asScala.map { aclCreation => + SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) match { + case Failure(throwable) => new AclCreationResponse(throwable) case Success((resource, acl)) => try { if (resource.resourceType.equals(Cluster) && !resource.name.equals(Resource.ClusterResourceName)) @@ -1868,25 +1802,19 @@ class KafkaApis(val requestChannel: RequestChannel, if (resource.name.isEmpty) throw new InvalidRequestException("Invalid empty resource name") auth.addAcls(immutable.Set(acl), resource) - if (logger.isDebugEnabled) - logger.debug(s"Added acl $acl to $resource") + + logger.debug(s"Added acl $acl to $resource") + + new AclCreationResponse(null) } catch { - case throwable : Throwable => if (logger.isDebugEnabled) { - logger.debug(s"Failed to add acl $acl to $resource", throwable) - } - errors.put(i, throwable) + case throwable: Throwable => + logger.debug(s"Failed to add acl $acl to $resource", throwable) + new AclCreationResponse(throwable) } } } - val aclCreationResults = new java.util.ArrayList[AclCreationResponse] - for (i <- 0 to createAclsRequest.aclCreations().size() - 1) { - errors.get(i) match { - case Some(throwable) => aclCreationResults.add(new AclCreationResponse(throwable)) - case None => aclCreationResults.add(new AclCreationResponse(null)) - } - } sendResponseMaybeThrottle(request, requestThrottleMs => - new CreateAclsResponse(requestThrottleMs, aclCreationResults)) + new CreateAclsResponse(requestThrottleMs, aclCreationResults.asJava)) } } @@ -1899,60 +1827,53 @@ class KafkaApis(val requestChannel: RequestChannel, deleteAclsRequest.getErrorResponse(requestThrottleMs, new SecurityDisabledException("No Authorizer is configured on the broker."))) case Some(auth) => - val filterResponseMap = mutable.HashMap[Int, AclFilterResponse]() - val toDelete = mutable.HashMap[Int, ListBuffer[(Resource, Acl)]]() - for (i <- 0 to deleteAclsRequest.filters().size - 1) { - toDelete.put(i, new ListBuffer[(Resource, Acl)]()) - } - if (deleteAclsRequest.filters().asScala.exists { f => !f.matchesAtMostOne() }) { - // Delete based on filters that may match more than one ACL. - val aclMap : Map[Resource, Set[Acl]] = auth.getAcls() - aclMap.foreach { case (resource, acls) => - acls.foreach { acl => - val binding = new AclBinding(new AdminResource(AdminResourceType. - fromString(resource.resourceType.toString), resource.name), - new AccessControlEntry(acl.principal.toString(), acl.host.toString(), - acl.operation.toJava, acl.permissionType.toJava)) - for (i <- 0 to deleteAclsRequest.filters().size - 1) { - val filter = deleteAclsRequest.filters().get(i) - if (filter.matches(binding)) { - toDelete.get(i).get += ((resource, acl)) - } - } + val filters = deleteAclsRequest.filters.asScala + val filterResponseMap = mutable.Map[Int, AclFilterResponse]() + val toDelete = mutable.Map[Int, ArrayBuffer[(Resource, Acl)]]() + + if (filters.forall(_.matchesAtMostOne)) { + // Delete based on a list of ACL fixtures. + for ((filter, i) <- filters.zipWithIndex) { + SecurityUtils.convertToResourceAndAcl(filter) match { + case Failure(throwable) => filterResponseMap.put(i, new AclFilterResponse(throwable, Seq.empty.asJava)) + case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture)) } } } else { - // Delete based on a list of ACL fixtures. - for (i <- 0 to deleteAclsRequest.filters().size - 1) { - toScala(deleteAclsRequest.filters().get(i)) match { - case Failure(throwable) => filterResponseMap.put(i, - new AclFilterResponse(throwable, Collections.emptySet[AclDeletionResult]())) - case Success(fixture) => toDelete.put(i, ListBuffer(fixture)) - } + // Delete based on filters that may match more than one ACL. + val aclMap = auth.getAcls() + val filtersWithIndex = filters.zipWithIndex + for ((resource, acls) <- aclMap; acl <- acls) { + val binding = new AclBinding( + new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name), + new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, + acl.permissionType.toJava)) + + for ((filter, i) <- filtersWithIndex if filter.matches(binding)) + toDelete.getOrElseUpdate(i, ArrayBuffer.empty) += ((resource, acl)) } } - for (i <- toDelete.keys) { - val deletionResults = new util.ArrayList[AclDeletionResult]() - for (acls <- toDelete.get(i)) { - for ((resource, acl) <- acls) { - try { - if (auth.removeAcls(immutable.Set(acl), resource)) { - deletionResults.add(new AclDeletionResult(null, toJava((resource, acl)))) - } - } catch { - case throwable: Throwable => deletionResults.add(new AclDeletionResult( - new UnknownServerException("Failed to delete ACL: " + throwable.toString), - toJava((resource, acl)))) - } + + for ((i, acls) <- toDelete) { + val deletionResults = acls.flatMap { case (resource, acl) => + val aclBinding = SecurityUtils.convertToAclBinding(resource, acl) + try { + if (auth.removeAcls(immutable.Set(acl), resource)) + Some(new AclDeletionResult(aclBinding)) + else None + } catch { + case throwable: Throwable => + Some(new AclDeletionResult(new UnknownServerException(s"Failed to delete ACL $acl: $throwable"), + aclBinding)) } - } - filterResponseMap.put(i, new AclFilterResponse(null, deletionResults)) - } - val filterResponses = new util.ArrayList[AclFilterResponse] - for (i <- 0 to deleteAclsRequest.filters().size() - 1) { - filterResponses.add(filterResponseMap.getOrElse(i, - new AclFilterResponse(null, new util.ArrayList[AclDeletionResult]()))) + }.asJava + + filterResponseMap.put(i, new AclFilterResponse(deletionResults)) } + + val filterResponses = filters.indices.map { i => + filterResponseMap.getOrElse(i, new AclFilterResponse(Seq.empty.asJava)) + }.asJava sendResponseMaybeThrottle(request, requestThrottleMs => new DeleteAclsResponse(requestThrottleMs, filterResponses)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala index a508c41..c6ebdd1 100644 --- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala +++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala @@ -34,7 +34,9 @@ class InterBrokerSendThreadTest { @Test def shouldNotSendAnythingWhenNoRequests(): Unit = { - val sendThread = new InterBrokerSendThread("name", networkClient, () => mutable.Iterable.empty, time) + val sendThread = new InterBrokerSendThread("name", networkClient, time) { + override def generateRequests() = mutable.Iterable.empty + } // poll is always called but there should be no further invocations on NetworkClient EasyMock.expect(networkClient.poll(EasyMock.anyLong(), EasyMock.anyLong())) @@ -52,9 +54,9 @@ class InterBrokerSendThreadTest { val request = new StubRequestBuilder() val node = new Node(1, "", 8080) val handler = RequestAndCompletionHandler(node, request, completionHandler) - val sendThread = new InterBrokerSendThread("name", networkClient, () => { - List[RequestAndCompletionHandler](handler) - }, time) + val sendThread = new InterBrokerSendThread("name", networkClient, time) { + override def generateRequests() = List[RequestAndCompletionHandler](handler) + } val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, handler.handler) @@ -86,9 +88,9 @@ class InterBrokerSendThreadTest { val request = new StubRequestBuilder val node = new Node(1, "", 8080) val requestAndCompletionHandler = RequestAndCompletionHandler(node, request, completionHandler) - val sendThread = new InterBrokerSendThread("name", networkClient, () => { - List[RequestAndCompletionHandler](requestAndCompletionHandler) - }, time) + val sendThread = new InterBrokerSendThread("name", networkClient, time) { + override def generateRequests() = List[RequestAndCompletionHandler](requestAndCompletionHandler) + } val clientRequest = new ClientRequest("dest", request, 0, "1", 0, true, requestAndCompletionHandler.handler) http://git-wip-us.apache.org/repos/asf/kafka/blob/63605779/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala index 01a350b..9e7fb13 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala @@ -73,8 +73,6 @@ class TransactionMarkerChannelManagerTest { txnMarkerPurgatory, time) - private val senderThread = channelManager.senderThread - private def mockCache(): Unit = { EasyMock.expect(txnStateManager.partitionFor(transactionalId1)) .andReturn(txnTopicPartition1) @@ -93,7 +91,7 @@ class TransactionMarkerChannelManagerTest { @Test def shouldGenerateEmptyMapWhenNoRequestsOutstanding(): Unit = { - assertTrue(senderThread.generateRequests().isEmpty) + assertTrue(channelManager.generateRequests().isEmpty) } @Test @@ -131,12 +129,12 @@ class TransactionMarkerChannelManagerTest { val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build() - val requests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler => + val requests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler => (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) }.toMap assertEquals(Map(broker1 -> expectedBroker1Request, broker2 -> expectedBroker2Request), requests) - assertTrue(senderThread.generateRequests().isEmpty) + assertTrue(channelManager.generateRequests().isEmpty) } @Test @@ -208,13 +206,13 @@ class TransactionMarkerChannelManagerTest { val expectedBroker2Request = new WriteTxnMarkersRequest.Builder( Utils.mkList(new WriteTxnMarkersRequest.TxnMarkerEntry(producerId1, producerEpoch, coordinatorEpoch, txnResult, Utils.mkList(partition2)))).build() - val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler => + val firstDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler => (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) }.toMap assertEquals(Map(broker2 -> expectedBroker2Request), firstDrainedRequests) - val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = senderThread.generateRequests().map { handler => + val secondDrainedRequests: Map[Node, WriteTxnMarkersRequest] = channelManager.generateRequests().map { handler => (handler.destination, handler.request.asInstanceOf[WriteTxnMarkersRequest.Builder].build()) }.toMap @@ -294,7 +292,7 @@ class TransactionMarkerChannelManagerTest { channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2) - val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests() + val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests() val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) for (requestAndHandler <- requestAndHandlers) { @@ -342,7 +340,7 @@ class TransactionMarkerChannelManagerTest { channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2) - val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests() + val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests() val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) for (requestAndHandler <- requestAndHandlers) { @@ -396,7 +394,7 @@ class TransactionMarkerChannelManagerTest { channelManager.addTxnMarkersToSend(transactionalId2, coordinatorEpoch, txnResult, txnMetadata2, txnTransitionMetadata2) - val requestAndHandlers: Iterable[RequestAndCompletionHandler] = senderThread.generateRequests() + val requestAndHandlers: Iterable[RequestAndCompletionHandler] = channelManager.generateRequests() val response = new WriteTxnMarkersResponse(createPidErrorMap(Errors.NONE)) for (requestAndHandler <- requestAndHandlers) { @@ -404,7 +402,7 @@ class TransactionMarkerChannelManagerTest { } // call this again so that append log will be retried - senderThread.generateRequests() + channelManager.generateRequests() EasyMock.verify(txnStateManager)
