This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 5b7afab MINOR: Rework NewPartitionReassignment public API (#7638)
5b7afab is described below
commit 5b7afab9cc5d04e63efcdf950fe2f2f9c3efacd6
Author: Stanislav Kozlovski <[email protected]>
AuthorDate: Tue Nov 5 18:34:11 2019 +0000
MINOR: Rework NewPartitionReassignment public API (#7638)
This patch removes the NewPartitionReassignment#of() method in favor of a
simple constructor. Said method was confusing due to breaking two conventions -
always returning a non-empty Optional and thus not being used as a static
factory method.
Reviewers: Ismael Juma <[email protected]>, Colin P. McCabe
<[email protected]>
(cherry picked from commit be58580e14be93618f11e609389ff6bb16317702)
---
.../apache/kafka/clients/admin/NewPartitionReassignment.java | 9 ++-------
.../org/apache/kafka/clients/admin/KafkaAdminClientTest.java | 8 ++++----
.../integration/kafka/api/AdminClientIntegrationTest.scala | 12 ++++++------
.../unit/kafka/admin/ReassignPartitionsClusterTest.scala | 4 ++--
4 files changed, 14 insertions(+), 19 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
index 111514a..f9a7008 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitionReassignment.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
/**
* A new partition reassignment, which can be applied via {@link
AdminClient#alterPartitionReassignments(Map,
AlterPartitionReassignmentsOptions)}.
@@ -32,13 +31,9 @@ public class NewPartitionReassignment {
/**
* @throws IllegalArgumentException if no replicas are supplied
*/
- public static Optional<NewPartitionReassignment> of(List<Integer>
replicas) {
- if (replicas == null || replicas.size() == 0)
+ public NewPartitionReassignment(List<Integer> targetReplicas) {
+ if (targetReplicas == null || targetReplicas.size() == 0)
throw new IllegalArgumentException("Cannot create a new partition
reassignment without any replicas");
- return Optional.of(new NewPartitionReassignment(replicas));
- }
-
- private NewPartitionReassignment(List<Integer> targetReplicas) {
this.targetReplicas = Collections.unmodifiableList(new
ArrayList<>(targetReplicas));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 9fd0c1c..df28f00 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -2000,7 +2000,7 @@ public class KafkaAdminClientTest {
TopicPartition tp2 = new TopicPartition("B", 0);
Map<TopicPartition, Optional<NewPartitionReassignment>>
reassignments = new HashMap<>();
reassignments.put(tp1, Optional.empty());
- reassignments.put(tp2,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
+ reassignments.put(tp2, Optional.of(new
NewPartitionReassignment(Arrays.asList(1, 2, 3))));
// 1. server returns less responses than number of partitions we
sent
AlterPartitionReassignmentsResponseData responseData1 = new
AlterPartitionReassignmentsResponseData();
@@ -2091,9 +2091,9 @@ public class KafkaAdminClientTest {
TopicPartition invalidTopicTP = new TopicPartition("", 0);
TopicPartition invalidPartitionTP = new TopicPartition("ABC", -1);
Map<TopicPartition, Optional<NewPartitionReassignment>>
invalidTopicReassignments = new HashMap<>();
- invalidTopicReassignments.put(invalidPartitionTP,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
- invalidTopicReassignments.put(invalidTopicTP,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
- invalidTopicReassignments.put(tp1,
NewPartitionReassignment.of(Arrays.asList(1, 2, 3)));
+ invalidTopicReassignments.put(invalidPartitionTP, Optional.of(new
NewPartitionReassignment(Arrays.asList(1, 2, 3))));
+ invalidTopicReassignments.put(invalidTopicTP, Optional.of(new
NewPartitionReassignment(Arrays.asList(1, 2, 3))));
+ invalidTopicReassignments.put(tp1, Optional.of(new
NewPartitionReassignment(Arrays.asList(1, 2, 3))));
AlterPartitionReassignmentsResponseData singlePartResponseData =
new AlterPartitionReassignmentsResponseData()
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 87cc42f..ad0a78e 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -22,7 +22,7 @@ import java.time.{Duration => JDuration}
import java.util.Arrays.asList
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
import java.{time, util}
import kafka.log.LogConfig
@@ -1996,9 +1996,9 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
createTopic(topic, numPartitions = 4)
- val validAssignment = NewPartitionReassignment.of(
+ val validAssignment = Optional.of(new NewPartitionReassignment(
(0 until brokerCount).map(_.asInstanceOf[Integer]).asJava
- )
+ ))
val nonExistentTp1 = new TopicPartition("topicA", 0)
val nonExistentTp2 = new TopicPartition(topic, 4)
@@ -2012,9 +2012,9 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp1),
classOf[UnknownTopicOrPartitionException])
assertFutureExceptionTypeEquals(nonExistentPartitionsResult.get(nonExistentTp2),
classOf[UnknownTopicOrPartitionException])
- val extraNonExistentReplica = NewPartitionReassignment.of((0 until
brokerCount + 1).map(_.asInstanceOf[Integer]).asJava)
- val negativeIdReplica = NewPartitionReassignment.of(Seq(-3, -2,
-1).map(_.asInstanceOf[Integer]).asJava)
- val duplicateReplica = NewPartitionReassignment.of(Seq(0, 1,
1).map(_.asInstanceOf[Integer]).asJava)
+ val extraNonExistentReplica = Optional.of(new NewPartitionReassignment((0
until brokerCount + 1).map(_.asInstanceOf[Integer]).asJava))
+ val negativeIdReplica = Optional.of(new NewPartitionReassignment(Seq(-3,
-2, -1).map(_.asInstanceOf[Integer]).asJava))
+ val duplicateReplica = Optional.of(new NewPartitionReassignment(Seq(0, 1,
1).map(_.asInstanceOf[Integer]).asJava))
val invalidReplicaResult = client.alterPartitionReassignments(Map(
tp1 -> extraNonExistentReplica,
tp2 -> negativeIdReplica,
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
index 5331559..9312be3 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
import scala.collection.{Map, Seq}
import scala.util.Random
import java.io.File
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
import java.util.concurrent.ExecutionException
import kafka.controller.ReplicaAssignment
@@ -1261,7 +1261,7 @@ class ReassignPartitionsClusterTest extends
ZooKeeperTestHarness with Logging {
}.mkString(",")
def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]):
(TopicPartition, java.util.Optional[NewPartitionReassignment]) =
- tp ->
NewPartitionReassignment.of(replicas.map(_.asInstanceOf[Integer]).asJava)
+ tp -> Optional.of(new
NewPartitionReassignment((replicas.map(_.asInstanceOf[Integer]).asJava)))
def cancelReassignmentEntry(tp: TopicPartition): (TopicPartition,
java.util.Optional[NewPartitionReassignment]) =
tp -> java.util.Optional.empty()