Hi -

I really like this feature. Let me confirm that I understand it correctly.

Suppose I have a system where I want to monitor the logs of an account, batch 
and/or job I have a topic path like:

/account/${account-key}/batch/${batchid}/job/${jobid}

I can use this feature to monitor the batch with:

/account/${account-key}/batch/${batchid}/job/*

I can use this feature to monitor the account with:

/account/${account-key}/batch/*/job/*

In either case I would add a listener that would notice new ${jobid} and 
${batchid} as these are introduced.

Regards,
Dave

> On Jan 16, 2018, at 9:49 PM, Jia Zhai <zhaiji...@gmail.com> wrote:
> 
> Hi All,
> Here is an brief idea of how to implementation "Subscribe to topics
> represented by regular expressions",  Would you please help go over and
> provide your feedback/comments?
> 
> -----------------------------
> https://gist.github.com/zhaijack/dc0ef5e94febdc2acada9dd4223667cf
> [Above it a gist link, for easy reading]
> ------------------------------
> 
> # PIP-12: Subscribe to topics represented by regular expressions
> 
> * **Status**: Proposal
> * **Author**: Jia Zhai - Streamlio
> * **Pull Request**: []
> * **Mailing List discussion**:
> 
> 
> ## Motivation
> The consumer needs to handle subscription to topics represented by
> regular expressions. The scope is `namespace` in first stage, all
> topics/patten should be targeted in same namespace, This will make
> easy authentication and authorization control.
> 
> At last, we should add and implementation a serials of new methods in
> `PulsarClient.java`
> ```java
> Consumer subscribe(String namespace, Collection<String>
> topicsNameWithoutNs, String subscription);
> Consumer subscribe(String namespace, Pattern topicsPattern, String
> subscription);
> ```
> 
> The goals the should be achieved are these below, we could achieve it
> one by one:
> - support subscription to multiple topics in the same namespace (no
> guarantee on ordering between topics)
> - support regex based subscription
> - auto-discover topic addition/deletion
> 
> ## Design
> 
> ### support subscription to multiple topics
> This will need a new implementation of `ConsumerBase` which wrapper
> over multiple single-topic-consumers, let’s name it as
> `TopicsConsumerImpl`.
> When user call new method
> `Consumer subscribe(String namespace, Collection<String>
> topicsNameWithoutNs, String subscription);`
> It will iteratively new a `ConsumerImpl` for each topic, and return a
> `TopicsConsumerImpl`. The main work is:
> 
> 1. This `TopicsConsumerImpl` class should provide implementation of
> abstract methods in `ConsumerBase`, Should also provide some specific
> methods such as:
> ```java
> // maintain a map for all the <Topic, Consumer>, after we subscribe
> all the topics.
> private final ConcurrentMap<String, ConsumerImpl> consumers = new
> ConcurrentHashMap<>();
> // get topics
> Set<String> getTopics();
> // get consumers
> List<ConsumerImpl> getConsumers();
> 
> // subscribe a topic
> void subscribeTopic(String topic);
> // unSubscribe a topic
> void unSubscribeTopic(String topic);
> ```
> 
> 2. While Message receive/ack, the message identify is needed. In the
> implementation, we need handle Message identify(MessageId) differently
> for some of the abstract methods in `ConsumerBase`, because we have to
> add `MessageId` with additional `String topic` or `consumer id`, Or we
> may need to change `MessageIdData` in `PulsarApi.proto`.
> 
> 
> 
> ### support regex based subscription.
> As mentioned before, the scope is `namespace`. The main work is:
> 
> 1. In above `TopicsConsumerImpl` class, need to keep the `Pattern`,
> which was passed in from api for subscription.
> 2. leverage currently pulsar admin API of `getList` to get a list of Topics.
> In `interface PersistentTopics `:
> ```java
> List<String> getList(String namespace) throws PulsarAdminException;
> List<String> getPartitionedTopicList(String namespace) throws
> PulsarAdminException;
> ```
> 
> 3. The process of new method `Consumer subscribe(String namespace,
> Pattern topicsPattern, String subscription)` should be like this:
> - call method `List<String> getList(String namespace)` to get all the topics;
> - Use `topicsPattern` to filter out the matched sub-topics-list.
> - construct the `TopicsConsumerImpl` with the the sub-topics-list.
> 
> ### auto-discover topic addition/deletion
> The main work is:
> 1. provide a listener, which based on topics changes, to do subscribe
> and unsubscribe on individual topic when target topic been
> changed(remove/add).
> ```java
> Interface TopicsChangeListener {
>       // unsubscribe and delete ConsumerImpl in the `consumers` map in
> `TopicsConsumerImpl` based on added topics.
>       void onTopicsRemoved(Collection<String> topics);
>       // subscribe and create a list of new ConsumerImpl, added them to the
> `consumers` map in `TopicsConsumerImpl`.
>       void onTopicsAdded(Collection<String> topics);
> }
> ```
> Add a method `void registerListener(TopicsChangeListener listener)` to
> `TopicsConsumerImpl`
> 
> 2. Based on above work, using a timer, periodically call `List<String>
> getList(String namespace)`. And comparing the filtered fresh
> sub-topics-list with current topics holden in `TopicsConsumerImpl`,
> try to get 2 lists: `newAddedTopicsList` and  `removedTopicsList`.
> 3. If the 2 lists not empty, call
> `TopicsChangeListener.onTopicsAdded(newAddedTopicsList)`, and
> `TopicsChangeListener.onTopicsRemoved(removedTopicsList)` to do
> subscribe and unsubscribe, and update `consumers` map in
> `TopicsConsumerImpl`.
> 
> # Changes
> The changes will be mostly on the surface and on client side:
> 1. add and implementation a serials of new methods in
> `org.apache.pulsar.client.api.PulsarClient.java`
> ```java
> Consumer subscribe(String namespace, Collection<String>
> topicsNameWithoutNs, String subscription);
> Consumer subscribe(String namespace, Pattern topicsPattern, String
> subscription);
> ```
> 2. add and implenentation of new `Consumer`, which is
> `TopicsConsumerImpl` , returned by above `subscribe` method
> 
> 
> -------------------------------
> 
> Thanks a lot.
> -Jia

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to