Hi, Chris

You went from being a user of the subscriptions feature to a maintainer fixing 
bugs in just a few hours. That's pretty cool.

Best
------------------
Xinyu Tan

On 2024/08/13 12:39:45 Christofer Dutz wrote:
> It seems all that is needed to avoid this problem, is to delete the return 
> statement in line 141 … at least in my local version with this change I can 
> re-start.
> 
> Chris
> 
> Von: Christofer Dutz <[email protected]>
> Datum: Dienstag, 13. August 2024 um 13:43
> An: [email protected] <[email protected]>
> Betreff: AW: When restarting an IoTDB with pre-existing pipeline 
> subscriptions, the system doesn't correctly startup
> Ok … solved the mystery
> 
> It’s a bug.
> 
> In SubscriptionConsumerAgent. handleConsumerGroupMetaChanges there is a for 
> loop to iterate over all subscriptions.
> I can see that both are in there … however the code calls return inside the 
> for-loop and hereby exists after the first successful subscription restore.
> This should probably be a “continue” instead of a “return” statement.
> 
> Chris
> 
> 
> Von: Christofer Dutz <[email protected]>
> Datum: Dienstag, 13. August 2024 um 13:23
> An: [email protected] <[email protected]>
> Betreff: AW: When restarting an IoTDB with pre-existing pipeline 
> subscriptions, the system doesn't correctly startup
> Hi all,
> 
> I think I’ve spotted something … so in my application I’m adding two 
> subscriptions. One for energy data and one for weather data …
> The energyData subscription seems to be registered first and then the 
> weatherData … if I restart the consumerGroupIdToSubscriptionBroker map only 
> seems to contain the first registered group.
> 
> Could this possibly be the cause? After looking into the issue, indeed I 
> could confirm that all warning messages seem to only relate to the 
> weatherData and the energyData seems to work just fine.
> 
> Chris
> 
> Von: Christofer Dutz <[email protected]>
> Datum: Dienstag, 13. August 2024 um 13:11
> An: [email protected] <[email protected]>
> Betreff: AW: When restarting an IoTDB with pre-existing pipeline 
> subscriptions, the system doesn't correctly startup
> Ok … using a full-text search and by setting breakpoints in every occasion 
> that could produce the log message I narrowed it down to this code:
> 
> The first one is logged by this code:
> 
> 
> public void bindPrefetchingQueue(final SubscriptionConnectorSubtask subtask) {
>   final String consumerGroupId = subtask.getConsumerGroupId();
>   final SubscriptionBroker broker = 
> consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
>   if (Objects.isNull(broker)) {
>     LOGGER.warn(
>         "Subscription: broker bound to consumer group [{}] does not exist", 
> consumerGroupId);
>     return;
>   }
>   broker.bindPrefetchingQueue(subtask.getTopicName(), 
> subtask.getInputPendingQueue());
> }
> 
> After that all seem to be coming from here:
> 
> public void executePrefetch(final String consumerGroupId, final String 
> topicName) {
>   final SubscriptionBroker broker = 
> consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
>   if (Objects.isNull(broker)) {
>     LOGGER.warn(
>         "Subscription: broker bound to consumer group [{}] does not exist", 
> consumerGroupId);
>     return;
>   }
>   broker.executePrefetch(topicName);
> }
> 
> So, broker is always null and the method always directly returns … in 
> PipeSubtask:
> 
> 
> @Override
> public Boolean call() throws Exception {
>   boolean hasAtLeastOneEventProcessed = false;
> 
>   try {
>     // If the scheduler allows to schedule, then try to consume an event
>     while (subtaskScheduler.schedule()) {
>       // If the event is consumed successfully, then continue to consume the 
> next event
>       // otherwise, stop consuming
>       if (!executeOnce()) {
>         break;
>       }
>       hasAtLeastOneEventProcessed = true;
>     }
>   } finally {
>     // Reset the scheduler to make sure that the scheduler can schedule again
>     subtaskScheduler.reset();
>   }
> 
>   return hasAtLeastOneEventProcessed;
> }
> 
> It's always treated as successful operation.
> 
> Not sure it should be this way.
> 
> Chris
> 
> Von: Christofer Dutz <[email protected]>
> Datum: Dienstag, 13. August 2024 um 12:42
> An: [email protected] <[email protected]>
> Betreff: When restarting an IoTDB with pre-existing pipeline subscriptions, 
> the system doesn't correctly startup
> Hi all,
> 
> I am using my Home-Automation experiment in parallel to my work in order to 
> learn and play with the cool features of IoTDB.
> 
> Here I have built a little Application, that uses PLC4X to subscribe to my 
> KNX network and pump all data into an IoTDB instance running embedded.
> 
> The thing I’m currently playing around with are the pipelines and the 
> subscription client.
> 
> So, when I start the system for the first time all is good … however as soon 
> as I restart the system, IoTDB floods my logger with so many:
> 
> 14015 [pool-60-IoTDB-Subscription-Executor-Pool-5] WARN  
> o.a.i.d.s.a.SubscriptionBrokerAgent - Subscription: broker bound to consumer 
> group [weatherServiceGroup] does not exist
> 
> Messages, that I simply cannot spot the initial problem in my log.
> 
> Is there anything special that needs to be done when starting a system with 
> pre-existing subscriptions? Is it also possible to define them as 
> “transient”? I don’t really need IoTDB to buffer results till my application 
> comes back up (All I want is the changed values whenever something changes).
> 
> Chris
> 

Reply via email to