This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4c602e6  KAFKA-7498: Remove references from `common.requests` to 
`clients` (#5784)
4c602e6 is described below

commit 4c602e6130e869abbd111d5d4efe1c2046935c6c
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Mon Oct 15 13:21:15 2018 +0100

    KAFKA-7498: Remove references from `common.requests` to `clients` (#5784)
    
    Add CreatePartitionsRequest.PartitionDetails similar to 
CreateTopicsRequest.TopicDetails to avoid references from `common.requests` 
package to `clients`.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 checkstyle/import-control.xml                      |  1 -
 .../kafka/clients/admin/KafkaAdminClient.java      |  9 +++-
 .../common/requests/CreatePartitionsRequest.java   | 63 ++++++++++++++++------
 .../kafka/common/requests/RequestResponseTest.java | 14 ++---
 .../src/main/scala/kafka/server/AdminManager.scala |  6 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  5 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  3 +-
 8 files changed, 68 insertions(+), 35 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7810a3e..91d23f6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -131,7 +131,6 @@
     </subpackage>
 
     <subpackage name="requests">
-      <allow pkg="org.apache.kafka.clients.admin" />
       <allow pkg="org.apache.kafka.common.acl" />
       <allow pkg="org.apache.kafka.common.protocol" />
       <allow pkg="org.apache.kafka.common.network" />
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c8418c1..b0120e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -80,6 +80,7 @@ import 
org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
 import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
 import org.apache.kafka.common.requests.CreatePartitionsRequest;
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -141,6 +142,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static org.apache.kafka.common.utils.Utils.closeQuietly;
 
@@ -2083,7 +2085,8 @@ public class KafkaAdminClient extends AdminClient {
         for (String topic : newPartitions.keySet()) {
             futures.put(topic, new KafkaFutureImpl<>());
         }
-        final Map<String, NewPartitions> requestMap = new 
HashMap<>(newPartitions);
+        final Map<String, PartitionDetails> requestMap = 
newPartitions.entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> 
partitionDetails(e.getValue())));
 
         final long now = time.milliseconds();
         runnable.call(new Call("createPartitions", calcDeadlineMs(now, 
options.timeoutMs()),
@@ -2482,6 +2485,10 @@ public class KafkaAdminClient extends AdminClient {
         return false;
     }
 
+    private PartitionDetails partitionDetails(NewPartitions newPartitions) {
+        return new PartitionDetails(newPartitions.totalCount(), 
newPartitions.assignments());
+    }
+
     private final static class ListConsumerGroupsResults {
         private final List<Throwable> errors;
         private final HashMap<String, ConsumerGroupListing> listings;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
index 795a66a..7872cf9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.ArrayOf;
 import org.apache.kafka.common.protocol.types.Field;
@@ -72,17 +71,47 @@ public class CreatePartitionsRequest extends 
AbstractRequest {
     // It is an error for duplicate topics to be present in the request,
     // so track duplicates here to allow KafkaApis to report per-topic errors.
     private final Set<String> duplicates;
-    private final Map<String, NewPartitions> newPartitions;
+    private final Map<String, PartitionDetails> newPartitions;
     private final int timeout;
     private final boolean validateOnly;
 
+    public static class PartitionDetails {
+
+        private final int totalCount;
+
+        private final List<List<Integer>> newAssignments;
+
+        public PartitionDetails(int totalCount) {
+            this(totalCount, null);
+        }
+
+        public PartitionDetails(int totalCount, List<List<Integer>> 
newAssignments) {
+            this.totalCount = totalCount;
+            this.newAssignments = newAssignments;
+        }
+
+        public int totalCount() {
+            return totalCount;
+        }
+
+        public List<List<Integer>> newAssignments() {
+            return newAssignments;
+        }
+
+        @Override
+        public String toString() {
+            return "(totalCount=" + totalCount() + ", newAssignments=" + 
newAssignments() + ")";
+        }
+
+    }
+
     public static class Builder extends 
AbstractRequest.Builder<CreatePartitionsRequest> {
 
-        private final Map<String, NewPartitions> newPartitions;
+        private final Map<String, PartitionDetails> newPartitions;
         private final int timeout;
         private final boolean validateOnly;
 
-        public Builder(Map<String, NewPartitions> newPartitions, int timeout, 
boolean validateOnly) {
+        public Builder(Map<String, PartitionDetails> newPartitions, int 
timeout, boolean validateOnly) {
             super(ApiKeys.CREATE_PARTITIONS);
             this.newPartitions = newPartitions;
             this.timeout = timeout;
@@ -106,7 +135,7 @@ public class CreatePartitionsRequest extends 
AbstractRequest {
         }
     }
 
-    CreatePartitionsRequest(Map<String, NewPartitions> newPartitions, int 
timeout, boolean validateOnly, short apiVersion) {
+    CreatePartitionsRequest(Map<String, PartitionDetails> newPartitions, int 
timeout, boolean validateOnly, short apiVersion) {
         super(ApiKeys.CREATE_PARTITIONS, apiVersion);
         this.newPartitions = newPartitions;
         this.duplicates = Collections.emptySet();
@@ -117,7 +146,7 @@ public class CreatePartitionsRequest extends 
AbstractRequest {
     public CreatePartitionsRequest(Struct struct, short apiVersion) {
         super(ApiKeys.CREATE_PARTITIONS, apiVersion);
         Object[] topicCountArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
-        Map<String, NewPartitions> counts = new 
HashMap<>(topicCountArray.length);
+        Map<String, PartitionDetails> counts = new 
HashMap<>(topicCountArray.length);
         Set<String> dupes = new HashSet<>();
         for (Object topicPartitionCountObj : topicCountArray) {
             Struct topicPartitionCountStruct = (Struct) topicPartitionCountObj;
@@ -125,7 +154,7 @@ public class CreatePartitionsRequest extends 
AbstractRequest {
             Struct partitionCountStruct = 
topicPartitionCountStruct.getStruct(NEW_PARTITIONS_KEY_NAME);
             int count = partitionCountStruct.getInt(COUNT_KEY_NAME);
             Object[] assignmentsArray = 
partitionCountStruct.getArray(ASSIGNMENT_KEY_NAME);
-            NewPartitions newPartition;
+            PartitionDetails newPartition;
             if (assignmentsArray != null) {
                 List<List<Integer>> assignments = new 
ArrayList<>(assignmentsArray.length);
                 for (Object replicas : assignmentsArray) {
@@ -136,11 +165,11 @@ public class CreatePartitionsRequest extends 
AbstractRequest {
                         replicasList.add((Integer) broker);
                     }
                 }
-                newPartition = NewPartitions.increaseTo(count, assignments);
+                newPartition = new PartitionDetails(count, assignments);
             } else {
-                newPartition = NewPartitions.increaseTo(count);
+                newPartition = new PartitionDetails(count);
             }
-            NewPartitions dupe = counts.put(topic, newPartition);
+            PartitionDetails dupe = counts.put(topic, newPartition);
             if (dupe != null) {
                 dupes.add(topic);
             }
@@ -155,7 +184,7 @@ public class CreatePartitionsRequest extends 
AbstractRequest {
         return duplicates;
     }
 
-    public Map<String, NewPartitions> newPartitions() {
+    public Map<String, PartitionDetails> newPartitions() {
         return newPartitions;
     }
 
@@ -171,17 +200,17 @@ public class CreatePartitionsRequest extends 
AbstractRequest {
     protected Struct toStruct() {
         Struct struct = new 
Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version()));
         List<Struct> topicPartitionsList = new ArrayList<>();
-        for (Map.Entry<String, NewPartitions> topicPartitionCount : 
this.newPartitions.entrySet()) {
+        for (Map.Entry<String, PartitionDetails> topicPartitionCount : 
this.newPartitions.entrySet()) {
             Struct topicPartitionCountStruct = 
struct.instance(TOPIC_PARTITIONS_KEY_NAME);
             topicPartitionCountStruct.set(TOPIC_NAME, 
topicPartitionCount.getKey());
-            NewPartitions count = topicPartitionCount.getValue();
+            PartitionDetails partitionDetails = topicPartitionCount.getValue();
             Struct partitionCountStruct = 
topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME);
-            partitionCountStruct.set(COUNT_KEY_NAME, count.totalCount());
+            partitionCountStruct.set(COUNT_KEY_NAME, 
partitionDetails.totalCount());
             Object[][] assignments = null;
-            if (count.assignments() != null) {
-                assignments = new Object[count.assignments().size()][];
+            if (partitionDetails.newAssignments() != null) {
+                assignments = new 
Object[partitionDetails.newAssignments().size()][];
                 int i = 0;
-                for (List<Integer> partitionAssignment : count.assignments()) {
+                for (List<Integer> partitionAssignment : 
partitionDetails.newAssignments()) {
                     assignments[i] = partitionAssignment.toArray(new 
Object[0]);
                     i++;
                 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e34348a..b7f39ef 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.clients.admin.NewPartitions;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
@@ -44,6 +43,7 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
 import org.apache.kafka.common.resource.PatternType;
@@ -1214,16 +1214,16 @@ public class RequestResponseTest {
     }
 
     private CreatePartitionsRequest createCreatePartitionsRequest() {
-        Map<String, NewPartitions> assignments = new HashMap<>();
-        assignments.put("my_topic", NewPartitions.increaseTo(3));
-        assignments.put("my_other_topic", NewPartitions.increaseTo(3));
+        Map<String, PartitionDetails> assignments = new HashMap<>();
+        assignments.put("my_topic", new PartitionDetails(3));
+        assignments.put("my_other_topic", new PartitionDetails(3));
         return new CreatePartitionsRequest(assignments, 0, false, (short) 0);
     }
 
     private CreatePartitionsRequest 
createCreatePartitionsRequestWithAssignments() {
-        Map<String, NewPartitions> assignments = new HashMap<>();
-        assignments.put("my_topic", NewPartitions.increaseTo(3, 
asList(asList(2))));
-        assignments.put("my_other_topic", NewPartitions.increaseTo(3, 
asList(asList(2, 3), asList(3, 1))));
+        Map<String, PartitionDetails> assignments = new HashMap<>();
+        assignments.put("my_topic", new PartitionDetails(3, 
asList(asList(2))));
+        assignments.put("my_other_topic", new PartitionDetails(3, 
asList(asList(2, 3), asList(3, 1))));
         return new CreatePartitionsRequest(assignments, 0, false, (short) 0);
     }
 
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index 2b48170..4ec083a 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -24,13 +24,13 @@ import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import kafka.zk.{AdminZkClient, KafkaZkClient}
-import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource}
 import org.apache.kafka.common.errors.{ApiException, 
InvalidPartitionsException, InvalidReplicaAssignmentException, 
InvalidRequestException, ReassignmentInProgressException, 
UnknownTopicOrPartitionException, InvalidConfigurationException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
+import 
org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails
 import org.apache.kafka.common.requests.CreateTopicsRequest._
 import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, 
DescribeConfigsResponse}
@@ -204,7 +204,7 @@ class AdminManager(val config: KafkaConfig,
   }
 
   def createPartitions(timeout: Int,
-                       newPartitions: Map[String, NewPartitions],
+                       newPartitions: Map[String, PartitionDetails],
                        validateOnly: Boolean,
                        listenerName: ListenerName,
                        callback: Map[String, ApiError] => Unit): Unit = {
@@ -237,7 +237,7 @@ class AdminManager(val config: KafkaConfig,
           throw new InvalidPartitionsException(s"Topic already has 
$oldNumPartitions partitions.")
         }
 
-        val reassignment = 
Option(newPartition.assignments).map(_.asScala.map(_.asScala.map(_.toInt))).map 
{ assignments =>
+        val reassignment = 
Option(newPartition.newAssignments).map(_.asScala.map(_.asScala.map(_.toInt))).map
 { assignments =>
           val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
           if (unknownBrokers.nonEmpty)
             throw new InvalidReplicaAssignmentException(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index ecbbdb6..6a99db3 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,8 +31,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, 
TransactionCoordinator}
-import kafka.log.{Log, LogManager, TimestampOffset}
-import kafka.message.{CompressionCodec, NoCompressionCodec, 
ZStdCompressionCodec}
+import kafka.message.ZStdCompressionCodec
 import kafka.network.RequestChannel
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
@@ -49,6 +48,7 @@ import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{BaseRecords, ControlRecordType, 
EndTransactionMarker, LazyDownConversionRecords, MemoryRecords, 
MultiRecordsSend, RecordBatch, RecordConversionStats, Records}
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, 
AclFilterResponse}
 import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -64,7 +64,6 @@ import scala.collection.JavaConverters._
 import scala.collection._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Success, Try}
-import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
 
 /**
  * Logic to handle the various Kafka requests
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ae62736..a2f1049 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -324,7 +324,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createPartitionsRequest = {
     new CreatePartitionsRequest.Builder(
-      Map(topic -> NewPartitions.increaseTo(10)).asJava, 10000, true
+      Map(topic -> new CreatePartitionsRequest.PartitionDetails(10)).asJava, 
10000, true
     ).build()
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 3604385..f2d3b4a 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -22,7 +22,6 @@ import kafka.log.LogConfig
 import kafka.network.RequestChannel.Session
 import kafka.security.auth._
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, 
ResourcePatternFilter, ResourceType => AdminResourceType}
@@ -342,7 +341,7 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.CREATE_PARTITIONS =>
           new CreatePartitionsRequest.Builder(
-            Collections.singletonMap("topic-2", NewPartitions.increaseTo(1)), 
0, false
+            Collections.singletonMap("topic-2", new 
CreatePartitionsRequest.PartitionDetails(1)), 0, false
           )
 
         case ApiKeys.CREATE_DELEGATION_TOKEN =>

Reply via email to