This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 4c602e6 KAFKA-7498: Remove references from `common.requests` to `clients` (#5784) 4c602e6 is described below commit 4c602e6130e869abbd111d5d4efe1c2046935c6c Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Mon Oct 15 13:21:15 2018 +0100 KAFKA-7498: Remove references from `common.requests` to `clients` (#5784) Add CreatePartitionsRequest.PartitionDetails similar to CreateTopicsRequest.TopicDetails to avoid references from `common.requests` package to `clients`. Reviewers: Ismael Juma <ism...@juma.me.uk> --- checkstyle/import-control.xml | 1 - .../kafka/clients/admin/KafkaAdminClient.java | 9 +++- .../common/requests/CreatePartitionsRequest.java | 63 ++++++++++++++++------ .../kafka/common/requests/RequestResponseTest.java | 14 ++--- .../src/main/scala/kafka/server/AdminManager.scala | 6 +-- core/src/main/scala/kafka/server/KafkaApis.scala | 5 +- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 3 +- 8 files changed, 68 insertions(+), 35 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7810a3e..91d23f6 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -131,7 +131,6 @@ </subpackage> <subpackage name="requests"> - <allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="org.apache.kafka.common.acl" /> <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.network" /> diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index c8418c1..b0120e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -80,6 +80,7 @@ import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreateDelegationTokenRequest; import org.apache.kafka.common.requests.CreateDelegationTokenResponse; import org.apache.kafka.common.requests.CreatePartitionsRequest; +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; @@ -141,6 +142,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -2083,7 +2085,8 @@ public class KafkaAdminClient extends AdminClient { for (String topic : newPartitions.keySet()) { futures.put(topic, new KafkaFutureImpl<>()); } - final Map<String, NewPartitions> requestMap = new HashMap<>(newPartitions); + final Map<String, PartitionDetails> requestMap = newPartitions.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> partitionDetails(e.getValue()))); final long now = time.milliseconds(); runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()), @@ -2482,6 +2485,10 @@ public class KafkaAdminClient extends AdminClient { return false; } + private PartitionDetails partitionDetails(NewPartitions newPartitions) { + return new PartitionDetails(newPartitions.totalCount(), newPartitions.assignments()); + } + private final static class ListConsumerGroupsResults { private final List<Throwable> errors; private final HashMap<String, ConsumerGroupListing> listings; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java index 795a66a..7872cf9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -72,17 +71,47 @@ public class CreatePartitionsRequest extends AbstractRequest { // It is an error for duplicate topics to be present in the request, // so track duplicates here to allow KafkaApis to report per-topic errors. private final Set<String> duplicates; - private final Map<String, NewPartitions> newPartitions; + private final Map<String, PartitionDetails> newPartitions; private final int timeout; private final boolean validateOnly; + public static class PartitionDetails { + + private final int totalCount; + + private final List<List<Integer>> newAssignments; + + public PartitionDetails(int totalCount) { + this(totalCount, null); + } + + public PartitionDetails(int totalCount, List<List<Integer>> newAssignments) { + this.totalCount = totalCount; + this.newAssignments = newAssignments; + } + + public int totalCount() { + return totalCount; + } + + public List<List<Integer>> newAssignments() { + return newAssignments; + } + + @Override + public String toString() { + return "(totalCount=" + totalCount() + ", newAssignments=" + newAssignments() + ")"; + } + + } + public static class Builder extends AbstractRequest.Builder<CreatePartitionsRequest> { - private final Map<String, NewPartitions> newPartitions; + private final Map<String, PartitionDetails> newPartitions; private final int timeout; private final boolean validateOnly; - public Builder(Map<String, NewPartitions> newPartitions, int timeout, boolean validateOnly) { + public Builder(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly) { super(ApiKeys.CREATE_PARTITIONS); this.newPartitions = newPartitions; this.timeout = timeout; @@ -106,7 +135,7 @@ public class CreatePartitionsRequest extends AbstractRequest { } } - CreatePartitionsRequest(Map<String, NewPartitions> newPartitions, int timeout, boolean validateOnly, short apiVersion) { + CreatePartitionsRequest(Map<String, PartitionDetails> newPartitions, int timeout, boolean validateOnly, short apiVersion) { super(ApiKeys.CREATE_PARTITIONS, apiVersion); this.newPartitions = newPartitions; this.duplicates = Collections.emptySet(); @@ -117,7 +146,7 @@ public class CreatePartitionsRequest extends AbstractRequest { public CreatePartitionsRequest(Struct struct, short apiVersion) { super(ApiKeys.CREATE_PARTITIONS, apiVersion); Object[] topicCountArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME); - Map<String, NewPartitions> counts = new HashMap<>(topicCountArray.length); + Map<String, PartitionDetails> counts = new HashMap<>(topicCountArray.length); Set<String> dupes = new HashSet<>(); for (Object topicPartitionCountObj : topicCountArray) { Struct topicPartitionCountStruct = (Struct) topicPartitionCountObj; @@ -125,7 +154,7 @@ public class CreatePartitionsRequest extends AbstractRequest { Struct partitionCountStruct = topicPartitionCountStruct.getStruct(NEW_PARTITIONS_KEY_NAME); int count = partitionCountStruct.getInt(COUNT_KEY_NAME); Object[] assignmentsArray = partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME); - NewPartitions newPartition; + PartitionDetails newPartition; if (assignmentsArray != null) { List<List<Integer>> assignments = new ArrayList<>(assignmentsArray.length); for (Object replicas : assignmentsArray) { @@ -136,11 +165,11 @@ public class CreatePartitionsRequest extends AbstractRequest { replicasList.add((Integer) broker); } } - newPartition = NewPartitions.increaseTo(count, assignments); + newPartition = new PartitionDetails(count, assignments); } else { - newPartition = NewPartitions.increaseTo(count); + newPartition = new PartitionDetails(count); } - NewPartitions dupe = counts.put(topic, newPartition); + PartitionDetails dupe = counts.put(topic, newPartition); if (dupe != null) { dupes.add(topic); } @@ -155,7 +184,7 @@ public class CreatePartitionsRequest extends AbstractRequest { return duplicates; } - public Map<String, NewPartitions> newPartitions() { + public Map<String, PartitionDetails> newPartitions() { return newPartitions; } @@ -171,17 +200,17 @@ public class CreatePartitionsRequest extends AbstractRequest { protected Struct toStruct() { Struct struct = new Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version())); List<Struct> topicPartitionsList = new ArrayList<>(); - for (Map.Entry<String, NewPartitions> topicPartitionCount : this.newPartitions.entrySet()) { + for (Map.Entry<String, PartitionDetails> topicPartitionCount : this.newPartitions.entrySet()) { Struct topicPartitionCountStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME); topicPartitionCountStruct.set(TOPIC_NAME, topicPartitionCount.getKey()); - NewPartitions count = topicPartitionCount.getValue(); + PartitionDetails partitionDetails = topicPartitionCount.getValue(); Struct partitionCountStruct = topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME); - partitionCountStruct.set(COUNT_KEY_NAME, count.totalCount()); + partitionCountStruct.set(COUNT_KEY_NAME, partitionDetails.totalCount()); Object[][] assignments = null; - if (count.assignments() != null) { - assignments = new Object[count.assignments().size()][]; + if (partitionDetails.newAssignments() != null) { + assignments = new Object[partitionDetails.newAssignments().size()][]; int i = 0; - for (List<Integer> partitionAssignment : count.assignments()) { + for (List<Integer> partitionAssignment : partitionDetails.newAssignments()) { assignments[i] = partitionAssignment.toArray(new Object[0]); i++; } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e34348a..b7f39ef 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.acl.AccessControlEntry; @@ -44,6 +43,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; import org.apache.kafka.common.resource.PatternType; @@ -1214,16 +1214,16 @@ public class RequestResponseTest { } private CreatePartitionsRequest createCreatePartitionsRequest() { - Map<String, NewPartitions> assignments = new HashMap<>(); - assignments.put("my_topic", NewPartitions.increaseTo(3)); - assignments.put("my_other_topic", NewPartitions.increaseTo(3)); + Map<String, PartitionDetails> assignments = new HashMap<>(); + assignments.put("my_topic", new PartitionDetails(3)); + assignments.put("my_other_topic", new PartitionDetails(3)); return new CreatePartitionsRequest(assignments, 0, false, (short) 0); } private CreatePartitionsRequest createCreatePartitionsRequestWithAssignments() { - Map<String, NewPartitions> assignments = new HashMap<>(); - assignments.put("my_topic", NewPartitions.increaseTo(3, asList(asList(2)))); - assignments.put("my_other_topic", NewPartitions.increaseTo(3, asList(asList(2, 3), asList(3, 1)))); + Map<String, PartitionDetails> assignments = new HashMap<>(); + assignments.put("my_topic", new PartitionDetails(3, asList(asList(2)))); + assignments.put("my_other_topic", new PartitionDetails(3, asList(asList(2, 3), asList(3, 1)))); return new CreatePartitionsRequest(assignments, 0, false, (short) 0); } diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 2b48170..4ec083a 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -24,13 +24,13 @@ import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} -import org.apache.kafka.clients.admin.NewPartitions import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource} import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException, InvalidConfigurationException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails import org.apache.kafka.common.requests.CreateTopicsRequest._ import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse} @@ -204,7 +204,7 @@ class AdminManager(val config: KafkaConfig, } def createPartitions(timeout: Int, - newPartitions: Map[String, NewPartitions], + newPartitions: Map[String, PartitionDetails], validateOnly: Boolean, listenerName: ListenerName, callback: Map[String, ApiError] => Unit): Unit = { @@ -237,7 +237,7 @@ class AdminManager(val config: KafkaConfig, throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.") } - val reassignment = Option(newPartition.assignments).map(_.asScala.map(_.asScala.map(_.toInt))).map { assignments => + val reassignment = Option(newPartition.newAssignments).map(_.asScala.map(_.asScala.map(_.toInt))).map { assignments => val unknownBrokers = assignments.flatten.toSet -- allBrokerIds if (unknownBrokers.nonEmpty) throw new InvalidReplicaAssignmentException( diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ecbbdb6..6a99db3 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -31,8 +31,7 @@ import kafka.common.OffsetAndMetadata import kafka.controller.KafkaController import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} -import kafka.log.{Log, LogManager, TimestampOffset} -import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec} +import kafka.message.ZStdCompressionCodec import kafka.network.RequestChannel import kafka.security.SecurityUtils import kafka.security.auth.{Resource, _} @@ -49,6 +48,7 @@ import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, MultiRecordsSend, RecordBatch, RecordConversionStats, Records} import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse +import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse @@ -64,7 +64,6 @@ import scala.collection.JavaConverters._ import scala.collection._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} -import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails /** * Logic to handle the various Kafka requests diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index ae62736..a2f1049 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -324,7 +324,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createPartitionsRequest = { new CreatePartitionsRequest.Builder( - Map(topic -> NewPartitions.increaseTo(10)).asJava, 10000, true + Map(topic -> new CreatePartitionsRequest.PartitionDetails(10)).asJava, 10000, true ).build() } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 3604385..f2d3b4a 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -22,7 +22,6 @@ import kafka.log.LogConfig import kafka.network.RequestChannel.Session import kafka.security.auth._ import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.NewPartitions import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} @@ -342,7 +341,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsRequest.Builder( - Collections.singletonMap("topic-2", NewPartitions.increaseTo(1)), 0, false + Collections.singletonMap("topic-2", new CreatePartitionsRequest.PartitionDetails(1)), 0, false ) case ApiKeys.CREATE_DELEGATION_TOKEN =>