clolov commented on code in PR #18983:
URL: https://github.com/apache/kafka/pull/18983#discussion_r1965316204
##########
clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java:
##########
@@ -28,4 +28,25 @@
*/
@InterfaceStability.Evolving
public class AlterPartitionReassignmentsOptions extends
AbstractOptions<AlterPartitionReassignmentsOptions> {
+
+ private Boolean allowReplicationFactorChange = true;
+
+ /**
+ * Set the option indicating if the alter partition reassignments call
should be
+ * allowed to alter the replication factor of a partition.
+ * In cases where it is not allowed, any replication factor change will
result in an exception thrown by the API.
+ */
+ public boolean allowReplicationFactorChange(boolean allow) {
+ this.allowReplicationFactorChange = allow;
+ return allowReplicationFactorChange;
Review Comment:
On a quick cross-check with other *Options it appears that for such methods
we return the *Options class itself. In other words, something along these lines
```
public AlterPartitionReassignmentsOptions
allowReplicationFactorChange(boolean allow) {
this.allowReplicationFactorChange = allow;
return this;
}
```
##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) {
assertEquals(NONE_REASSIGNING,
replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void testAlterPartitionDisallowReplicationFactorChange(short
version) {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 3}, new int[]
{1, 2, 3}, new int[] {1, 2, 3}});
+
+ ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(asList(
+ new
ReassignablePartition().setPartitionIndex(0).
+ setReplicas(asList(0, 1, 2)),
+ new
ReassignablePartition().setPartitionIndex(1).
+ setReplicas(asList(0, 1)),
+ new
ReassignablePartition().setPartitionIndex(2).
+ setReplicas(asList(0, 1, 2,
3)))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(asList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null),
+ new
ReassignablePartitionResponse().setPartitionIndex(1).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 2"),
+ new
ReassignablePartitionResponse().setPartitionIndex(2).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 4"))))),
+ alterResult.response());
+ ctx.replay(alterResult.records());
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(singletonList(new OngoingTopicReassignment().
+ setName("foo").setPartitions(singletonList(
+ new
OngoingPartitionReassignment().setPartitionIndex(0).
+
setRemovingReplicas(singletonList(3)).
Review Comment:
Nit: Partition 0 started with replicas (0, 1, 2). For the sake of
continuity, could you remove 0 and add 3 rather than the other way around?
##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2406,6 +2414,31 @@ private void updatePartitionInfo(
newPartInfo.elr);
}
+ private void validatePartitionReplicationFactorUnchanged(
+ PartitionRegistration part,
Review Comment:
Nit: Could you align the first argument with the opening bracket as other
methods in this file?
##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) {
assertEquals(NONE_REASSIGNING,
replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void testAlterPartitionDisallowReplicationFactorChange(short
version) {
Review Comment:
Maybe there already are such tests, in which case please point them out to
me, but if not, could you add the same tests but with
`setAllowReplicationFactorChange(true)`?
##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) {
assertEquals(NONE_REASSIGNING,
replication.listPartitionReassignments(null, Long.MAX_VALUE));
}
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void testAlterPartitionDisallowReplicationFactorChange(short
version) {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 3}, new int[]
{1, 2, 3}, new int[] {1, 2, 3}});
+
+ ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(asList(
+ new
ReassignablePartition().setPartitionIndex(0).
+ setReplicas(asList(0, 1, 2)),
+ new
ReassignablePartition().setPartitionIndex(1).
+ setReplicas(asList(0, 1)),
+ new
ReassignablePartition().setPartitionIndex(2).
+ setReplicas(asList(0, 1, 2,
3)))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(asList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null),
+ new
ReassignablePartitionResponse().setPartitionIndex(1).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 2"),
+ new
ReassignablePartitionResponse().setPartitionIndex(2).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 4"))))),
+ alterResult.response());
+ ctx.replay(alterResult.records());
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(singletonList(new OngoingTopicReassignment().
+ setName("foo").setPartitions(singletonList(
+ new
OngoingPartitionReassignment().setPartitionIndex(0).
+
setRemovingReplicas(singletonList(3)).
+
setAddingReplicas(singletonList(0)).
+ setReplicas(asList(0, 1, 2,
3))))));
+ assertEquals(currentReassigning,
replication.listPartitionReassignments(singletonList(
+ new ListPartitionReassignmentsTopics().setName("foo").
+ setPartitionIndexes(asList(0, 1, 2))),
Long.MAX_VALUE));
+
+ // test alter replica factor not allow to change when partition
reassignment is ongoing
+ ControllerResult<AlterPartitionReassignmentsResponseData>
alterReassigningResult =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 1)))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).
+
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+ setErrorMessage("The
replication factor is changed from 3 to 2"))))),
+ alterReassigningResult.response());
+
+ ControllerResult<AlterPartitionReassignmentsResponseData>
alterReassigningResult2 =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 2, 3)))))).
+ setAllowReplicationFactorChange(false));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).
+ setErrorMessage(null))))),
+ alterReassigningResult2.response());
+ }
+
+ @ParameterizedTest
+ @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+ public void
testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short
version) {
+ MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+ ReplicationControlTestContext ctx = new
ReplicationControlTestContext.Builder()
+ .setMetadataVersion(metadataVersion)
+ .build();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3);
+ ctx.unfenceBrokers(0, 1, 2, 3);
+ ctx.createTestTopic("foo", new int[][] {new int[] {1, 2,
3}}).topicId();
+
+ ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+ replication.alterPartitionReassignments(
+ new
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+ new
ReassignableTopic().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartition().setPartitionIndex(0).
+ setReplicas(asList(0,
1, 2)))))));
+ assertEquals(new AlterPartitionReassignmentsResponseData().
+ setErrorMessage(null).setResponses(singletonList(
+ new
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+ new
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
+ alterResult.response());
+ ctx.replay(alterResult.records());
+ ListPartitionReassignmentsResponseData currentReassigning =
+ new
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+ setTopics(singletonList(new OngoingTopicReassignment().
+ setName("foo").setPartitions(singletonList(
+ new
OngoingPartitionReassignment().setPartitionIndex(0).
+
setRemovingReplicas(singletonList(3)).
Review Comment:
Nit: Same comment as in the previous test
##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java:
##########
@@ -432,6 +433,21 @@ topicPartition, new PartitionReassignmentState(asList(0,
1, 2), asList(0, 1, 2),
}
}
+ @ClusterTest
+ public void testDisallowReplicationFactorChange() {
Review Comment:
Could you also add a test (or build on this one) which showcases that
**increasing** the replication factor also fails?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]