Hi All,
Here is an brief idea of how to implementation "Subscribe to topics
represented by regular expressions",  Would you please help go over and
provide your feedback/comments?

-----------------------------
https://gist.github.com/zhaijack/dc0ef5e94febdc2acada9dd4223667cf
[Above it a gist link, for easy reading]
------------------------------

# PIP-12: Subscribe to topics represented by regular expressions

 * **Status**: Proposal
 * **Author**: Jia Zhai - Streamlio
 * **Pull Request**: []
 * **Mailing List discussion**:


## Motivation
The consumer needs to handle subscription to topics represented by
regular expressions. The scope is `namespace` in first stage, all
topics/patten should be targeted in same namespace, This will make
easy authentication and authorization control.

At last, we should add and implementation a serials of new methods in
`PulsarClient.java`
```java
Consumer subscribe(String namespace, Collection<String>
topicsNameWithoutNs, String subscription);
Consumer subscribe(String namespace, Pattern topicsPattern, String
subscription);
```

The goals the should be achieved are these below, we could achieve it
one by one:
- support subscription to multiple topics in the same namespace (no
guarantee on ordering between topics)
- support regex based subscription
- auto-discover topic addition/deletion

## Design

### support subscription to multiple topics
This will need a new implementation of `ConsumerBase` which wrapper
over multiple single-topic-consumers, let’s name it as
`TopicsConsumerImpl`.
When user call new method
`Consumer subscribe(String namespace, Collection<String>
topicsNameWithoutNs, String subscription);`
It will iteratively new a `ConsumerImpl` for each topic, and return a
`TopicsConsumerImpl`. The main work is:

1. This `TopicsConsumerImpl` class should provide implementation of
abstract methods in `ConsumerBase`, Should also provide some specific
methods such as:
```java
// maintain a map for all the <Topic, Consumer>, after we subscribe
all the topics.
private final ConcurrentMap<String, ConsumerImpl> consumers = new
ConcurrentHashMap<>();
// get topics
Set<String> getTopics();
// get consumers
List<ConsumerImpl> getConsumers();

// subscribe a topic
void subscribeTopic(String topic);
// unSubscribe a topic
void unSubscribeTopic(String topic);
```

2. While Message receive/ack, the message identify is needed. In the
implementation, we need handle Message identify(MessageId) differently
for some of the abstract methods in `ConsumerBase`, because we have to
add `MessageId` with additional `String topic` or `consumer id`, Or we
may need to change `MessageIdData` in `PulsarApi.proto`.



### support regex based subscription.
As mentioned before, the scope is `namespace`. The main work is:

1. In above `TopicsConsumerImpl` class, need to keep the `Pattern`,
which was passed in from api for subscription.
2. leverage currently pulsar admin API of `getList` to get a list of Topics.
In `interface PersistentTopics `:
```java
List<String> getList(String namespace) throws PulsarAdminException;
List<String> getPartitionedTopicList(String namespace) throws
PulsarAdminException;
```

3. The process of new method `Consumer subscribe(String namespace,
Pattern topicsPattern, String subscription)` should be like this:
- call method `List<String> getList(String namespace)` to get all the topics;
- Use `topicsPattern` to filter out the matched sub-topics-list.
- construct the `TopicsConsumerImpl` with the the sub-topics-list.

### auto-discover topic addition/deletion
The main work is:
1. provide a listener, which based on topics changes, to do subscribe
and unsubscribe on individual topic when target topic been
changed(remove/add).
```java
Interface TopicsChangeListener {
        // unsubscribe and delete ConsumerImpl in the `consumers` map in
`TopicsConsumerImpl` based on added topics.
        void onTopicsRemoved(Collection<String> topics);
        // subscribe and create a list of new ConsumerImpl, added them to the
`consumers` map in `TopicsConsumerImpl`.
        void onTopicsAdded(Collection<String> topics);
}
```
Add a method `void registerListener(TopicsChangeListener listener)` to
`TopicsConsumerImpl`

2. Based on above work, using a timer, periodically call `List<String>
getList(String namespace)`. And comparing the filtered fresh
sub-topics-list with current topics holden in `TopicsConsumerImpl`,
try to get 2 lists: `newAddedTopicsList` and  `removedTopicsList`.
3. If the 2 lists not empty, call
`TopicsChangeListener.onTopicsAdded(newAddedTopicsList)`, and
`TopicsChangeListener.onTopicsRemoved(removedTopicsList)` to do
subscribe and unsubscribe, and update `consumers` map in
`TopicsConsumerImpl`.

# Changes
The changes will be mostly on the surface and on client side:
1. add and implementation a serials of new methods in
`org.apache.pulsar.client.api.PulsarClient.java`
```java
Consumer subscribe(String namespace, Collection<String>
topicsNameWithoutNs, String subscription);
Consumer subscribe(String namespace, Pattern topicsPattern, String
subscription);
```
2. add and implenentation of new `Consumer`, which is
`TopicsConsumerImpl` , returned by above `subscribe` method


-------------------------------

Thanks a lot.
-Jia

Reply via email to