This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6d42559742c [cleanup] PIP-468: Remove deprecated registerConsumer
overloads (#25712)
6d42559742c is described below
commit 6d42559742c723f5a60911f72b1643e005df830f
Author: Matteo Merli <[email protected]>
AuthorDate: Fri May 8 10:09:08 2026 -0700
[cleanup] PIP-468: Remove deprecated registerConsumer overloads (#25712)
---
.../service/scalable/ScalableTopicController.java | 13 -----------
.../service/scalable/ScalableTopicService.java | 12 ----------
.../scalable/ScalableTopicControllerTest.java | 27 ++++++++++++----------
.../service/scalable/ScalableTopicServiceTest.java | 3 ++-
4 files changed, 17 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 3bd562a48e1..2d6c61e2256 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -467,19 +467,6 @@ public class ScalableTopicController {
* registration (a subscription's type doesn't change in practice);
subsequent
* registers with a different type still work but won't change the
ordering policy.
*/
- /**
- * @deprecated Defaults to {@link ScalableConsumerType#STREAM}
- * for backward compatibility. New callers should pass the explicit
type.
- */
- @Deprecated
- public CompletableFuture<ConsumerAssignment> registerConsumer(String
subscription,
- String
consumerName,
- long
consumerId,
-
TransportCnx cnx) {
- return registerConsumer(subscription, consumerName, consumerId,
- ScalableConsumerType.STREAM, cnx);
- }
-
public CompletableFuture<ConsumerAssignment> registerConsumer(String
subscription,
String
consumerName,
long
consumerId,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
index 12c7064e844..56b049181ca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicService.java
@@ -271,18 +271,6 @@ public class ScalableTopicService {
consumerType, cnx));
}
- /**
- * @deprecated Defaults to {@link ScalableConsumerType#STREAM}
- * for backward compatibility. New callers should pass the explicit
consumer type.
- */
- @Deprecated
- public CompletableFuture<ConsumerAssignment> registerConsumer(TopicName
topic, String subscription,
- String
consumerName, long consumerId,
-
org.apache.pulsar.broker.service.TransportCnx cnx) {
- return registerConsumer(topic, subscription, consumerName, consumerId,
- ScalableConsumerType.STREAM, cnx);
- }
-
/**
* Called when a scalable consumer's transport connection drops. Forwards
to the
* controller which marks the session disconnected and starts its grace
timer.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
index 8c75823ebef..9da30e662d4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ScalableTopics;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ScalableTopicStats;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -218,7 +219,8 @@ public class ScalableTopicControllerTest {
() -> controller.createSubscription("sub",
SubscriptionType.STREAM));
assertThrows(IllegalStateException.class, () ->
controller.deleteSubscription("sub"));
assertThrows(IllegalStateException.class,
- () -> controller.registerConsumer("sub", "c1", 1L,
mock(TransportCnx.class)));
+ () -> controller.registerConsumer(
+ "sub", "c1", 1L, ScalableConsumerType.STREAM,
mock(TransportCnx.class)));
assertThrows(IllegalStateException.class, () ->
controller.unregisterConsumer("sub", "c1"));
}
@@ -227,8 +229,8 @@ public class ScalableTopicControllerTest {
@Test
public void testRegisterConsumerPersistsAndAssigns() throws Exception {
controller.initialize().get();
- ConsumerAssignment assignment =
- controller.registerConsumer("sub-a", "c1", 1L,
mock(TransportCnx.class)).get();
+ ConsumerAssignment assignment = controller.registerConsumer(
+ "sub-a", "c1", 1L, ScalableConsumerType.STREAM,
mock(TransportCnx.class)).get();
assertEquals(assignment.assignedSegments().size(), INITIAL_SEGMENTS,
"single consumer owns all active segments");
@@ -241,10 +243,11 @@ public class ScalableTopicControllerTest {
@Test
public void testRegisterConsumerReconnectDoesNotDuplicate() throws
Exception {
controller.initialize().get();
- controller.registerConsumer("sub-a", "c1", 1L,
mock(TransportCnx.class)).get();
+ controller.registerConsumer(
+ "sub-a", "c1", 1L, ScalableConsumerType.STREAM,
mock(TransportCnx.class)).get();
// Reconnect: same name, new consumerId.
- ConsumerAssignment assignment =
- controller.registerConsumer("sub-a", "c1", 99L,
mock(TransportCnx.class)).get();
+ ConsumerAssignment assignment = controller.registerConsumer(
+ "sub-a", "c1", 99L, ScalableConsumerType.STREAM,
mock(TransportCnx.class)).get();
assertEquals(assignment.assignedSegments().size(), INITIAL_SEGMENTS);
// Still just one persisted registration.
@@ -254,8 +257,8 @@ public class ScalableTopicControllerTest {
@Test
public void testUnregisterConsumerDeletesPersistedEntry() throws Exception
{
controller.initialize().get();
- controller.registerConsumer("sub-a", "c1", 1L,
mock(TransportCnx.class)).get();
- controller.registerConsumer("sub-a", "c2", 2L,
mock(TransportCnx.class)).get();
+ controller.registerConsumer("sub-a", "c1", 1L,
ScalableConsumerType.STREAM, mock(TransportCnx.class)).get();
+ controller.registerConsumer("sub-a", "c2", 2L,
ScalableConsumerType.STREAM, mock(TransportCnx.class)).get();
assertEquals(resources.listConsumersAsync(topicName,
"sub-a").get().size(), 2);
controller.unregisterConsumer("sub-a", "c1").get();
@@ -329,7 +332,7 @@ public class ScalableTopicControllerTest {
public void testDeleteSubscriptionRemovesInMemoryCoordinator() throws
Exception {
controller.initialize().get();
controller.createSubscription("sub-a", SubscriptionType.STREAM).get();
- controller.registerConsumer("sub-a", "c1", 1L,
mock(TransportCnx.class)).get();
+ controller.registerConsumer("sub-a", "c1", 1L,
ScalableConsumerType.STREAM, mock(TransportCnx.class)).get();
controller.deleteSubscription("sub-a").get();
// After delete, the persisted consumer entries should be gone.
@@ -377,7 +380,7 @@ public class ScalableTopicControllerTest {
@Test
public void testSplitSegmentPropagatesToRegisteredConsumer() throws
Exception {
controller.initialize().get();
- controller.registerConsumer("sub-a", "c1", 1L,
mock(TransportCnx.class)).get();
+ controller.registerConsumer("sub-a", "c1", 1L,
ScalableConsumerType.STREAM, mock(TransportCnx.class)).get();
SegmentLayout after = controller.splitSegment(0).get();
// consumer still owns everything after split (single consumer).
@@ -406,8 +409,8 @@ public class ScalableTopicControllerTest {
controller.splitSegment(0).get();
controller.createSubscription("sub-a", SubscriptionType.STREAM).get();
controller.createSubscription("sub-b", SubscriptionType.QUEUE).get();
- controller.registerConsumer("sub-a", "c1", 1L,
mock(TransportCnx.class)).get();
- controller.registerConsumer("sub-a", "c2", 2L,
mock(TransportCnx.class)).get();
+ controller.registerConsumer("sub-a", "c1", 1L,
ScalableConsumerType.STREAM, mock(TransportCnx.class)).get();
+ controller.registerConsumer("sub-a", "c2", 2L,
ScalableConsumerType.STREAM, mock(TransportCnx.class)).get();
ScalableTopicStats stats = controller.getStats().get();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
index 7f586ca6d01..725016f6e67 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicServiceTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.ScalableTopics;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ScalableTopicStats;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -303,7 +304,7 @@ public class ScalableTopicServiceTest {
TopicName tn = scalableTopic("t-reg");
service.createScalableTopic(tn, 2).get();
- ConsumerAssignment assignment = service.registerConsumer(tn, "sub-z",
"c1", 1L,
+ ConsumerAssignment assignment = service.registerConsumer(tn, "sub-z",
"c1", 1L, ScalableConsumerType.STREAM,
mock(TransportCnx.class)).get();
assertEquals(assignment.assignedSegments().size(), 2);
assertEquals(resources.listConsumersAsync(tn, "sub-z").get().size(),
1);