Il Dom 12 Feb 2023, 04:42 Yubiao Feng <yubiao.f...@streamnative.io.invalid>
ha scritto:

> Hi Enrico Olivelli
>
> > It is good to help users to not fall into bad situations but on the other
> case we cannot deal with many silly configurations that you could set up,
> like creating a pipeline of functions that in the end create a cycle.
>
> Sorry, this test just helps to reproduce the problem quickly. The reality
> is that there is only one consumer, but every restart triggers this issue
> and ends up with a topic like this:
> "persistent://public/default/tp1-sub1-RETRY-sub1-RETRY-sub1-RETRY...."
>
> > I wonder if we could simply document this fact instead of adding code
>
> ```java
> Consumer<> consumer = pulsarClient.newConsumer()
> .topicsPattern("my-property/my-ns/.*").subscriptionName("sub1")
> .enableRetry(true)
> ```
>
> With the client restarted, the code above will reproduce the problem.
>


I see the problem now.

We must do something for this case. It must not happen. We have to fix it

Thanks for your clarification

Enrico

>
> On Sun, Feb 12, 2023 at 3:31 AM Enrico Olivelli <eolive...@gmail.com>
> wrote:
>
> > Yubiao,
> >
> > Il Sab 11 Feb 2023, 19:06 Yubiao Feng <yubiao.f...@streamnative.io
> > .invalid>
> > ha scritto:
> >
> > > Hi community
> > >
> > > I am starting a DISCUSS for "Retry topic should not create for a retry
> > > topic."
> > >
> > > If we use regex-topic consumer and enable retry, it is possible to
> create
> > > such a topic
> > > "persistent://public/default/tp1-sub1-RETRY-sub2-RETRY-sub3-RETRY....".
> > You
> > > can reproduce this by using the test below.
> > >
> > > It probably doesn't make sense to create a RETRY/DLQ topic on
> RETRY/DLQ.
> > We
> > > should avoid this scenario if users use the default configuration
> (users
> > > can enable it if they need it).
> > >
> >
> > I agree that this is a bad case.
> > But should we really care?
> >
> > You must do it very intentionally.
> > It is good to help users to not fall into bad situations but on the other
> > case we cannot deal with many silly configurations that you could set up,
> > like creating a pipeline of functions that in the end create a cycle.
> >
> >
> > I wonder if we could simply document this fact instead of adding code
> >
> >
> > Enrico
> >
> >
> >
> >
> > > ```java
> > >     @Test
> > >     public void testRetryTopicWillNotCreatedForRetryTopic() throws
> > > Exception {
> > >         final String topic = "persistent://my-property/my-ns/tp1";
> > >         Producer<byte[]> producer =
> > pulsarClient.newProducer(Schema.BYTES)
> > >                 .topic(topic)
> > >                 .create();
> > >         for (int i = 0; i < 100; i++) {
> > >             producer.send(String.format("Hello Pulsar [%d]",
> > > i).getBytes());
> > >         }
> > >         producer.close();
> > >
> > >         for (int i =0; i< 10; i++) {
> > >             Consumer<byte[]> consumer =
> > > pulsarClient.newConsumer(Schema.BYTES)
> > >                     .topicsPattern("my-property/my-ns/.*")
> > >                     .subscriptionName("sub" + i)
> > >                     .enableRetry(true)
> > >
> > >
> >
> .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build())
> > >
> > > .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
> > >                     .subscribe();
> > >             Message<byte[]> message = consumer.receive();
> > >             log.info("consumer received message : {} {}",
> > > message.getMessageId(), new String(message.getData()));
> > >             consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
> > >             consumer.close();
> > >         }
> > >
> > >         Set<String> tps =
> > >
> > >
> >
> pulsar.getBrokerService().getTopics().keys().stream().collect(Collectors.toSet());
> > >         try {
> > >             for (String tp : tps) {
> > >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > > RETRY_GROUP_TOPIC_SUFFIX) <= 1, tp);
> > >                 assertTrue(howManyKeyWordRetryInTopicName(tp,
> > > DLQ_GROUP_TOPIC_SUFFIX) <= 1, tp);
> > >             }
> > >         } finally {
> > >             // cleanup.
> > >             for (String tp : tps){
> > >                 if (tp.startsWith(topic)) {
> > >                     admin.topics().delete(tp ,true);
> > >                 }
> > >             }
> > >         }
> > >     }
> > >
> > >     private int howManyKeyWordRetryInTopicName(String topicName, String
> > > keyWord) {
> > >         int retryCountInTopicName = 0;
> > >         String tpCp = topicName;
> > >         while (true) {
> > >             int index = tpCp.indexOf(keyWord);
> > >             if (index < 0) {
> > >                 break;
> > >             }
> > >             tpCp = tpCp.substring(index + keyWord.length());
> > >             retryCountInTopicName++;
> > >         }
> > >         return  retryCountInTopicName;
> > >     }
> > > ```
> > >
> > > Thanks
> > > Yubiao Feng
> > >
> >
>

Reply via email to