Hi, Chris You went from being a user of the subscriptions feature to a maintainer fixing bugs in just a few hours. That's pretty cool.
Best ------------------ Xinyu Tan On 2024/08/13 12:39:45 Christofer Dutz wrote: > It seems all that is needed to avoid this problem, is to delete the return > statement in line 141 … at least in my local version with this change I can > re-start. > > Chris > > Von: Christofer Dutz <[email protected]> > Datum: Dienstag, 13. August 2024 um 13:43 > An: [email protected] <[email protected]> > Betreff: AW: When restarting an IoTDB with pre-existing pipeline > subscriptions, the system doesn't correctly startup > Ok … solved the mystery > > It’s a bug. > > In SubscriptionConsumerAgent. handleConsumerGroupMetaChanges there is a for > loop to iterate over all subscriptions. > I can see that both are in there … however the code calls return inside the > for-loop and hereby exists after the first successful subscription restore. > This should probably be a “continue” instead of a “return” statement. > > Chris > > > Von: Christofer Dutz <[email protected]> > Datum: Dienstag, 13. August 2024 um 13:23 > An: [email protected] <[email protected]> > Betreff: AW: When restarting an IoTDB with pre-existing pipeline > subscriptions, the system doesn't correctly startup > Hi all, > > I think I’ve spotted something … so in my application I’m adding two > subscriptions. One for energy data and one for weather data … > The energyData subscription seems to be registered first and then the > weatherData … if I restart the consumerGroupIdToSubscriptionBroker map only > seems to contain the first registered group. > > Could this possibly be the cause? After looking into the issue, indeed I > could confirm that all warning messages seem to only relate to the > weatherData and the energyData seems to work just fine. > > Chris > > Von: Christofer Dutz <[email protected]> > Datum: Dienstag, 13. August 2024 um 13:11 > An: [email protected] <[email protected]> > Betreff: AW: When restarting an IoTDB with pre-existing pipeline > subscriptions, the system doesn't correctly startup > Ok … using a full-text search and by setting breakpoints in every occasion > that could produce the log message I narrowed it down to this code: > > The first one is logged by this code: > > > public void bindPrefetchingQueue(final SubscriptionConnectorSubtask subtask) { > final String consumerGroupId = subtask.getConsumerGroupId(); > final SubscriptionBroker broker = > consumerGroupIdToSubscriptionBroker.get(consumerGroupId); > if (Objects.isNull(broker)) { > LOGGER.warn( > "Subscription: broker bound to consumer group [{}] does not exist", > consumerGroupId); > return; > } > broker.bindPrefetchingQueue(subtask.getTopicName(), > subtask.getInputPendingQueue()); > } > > After that all seem to be coming from here: > > public void executePrefetch(final String consumerGroupId, final String > topicName) { > final SubscriptionBroker broker = > consumerGroupIdToSubscriptionBroker.get(consumerGroupId); > if (Objects.isNull(broker)) { > LOGGER.warn( > "Subscription: broker bound to consumer group [{}] does not exist", > consumerGroupId); > return; > } > broker.executePrefetch(topicName); > } > > So, broker is always null and the method always directly returns … in > PipeSubtask: > > > @Override > public Boolean call() throws Exception { > boolean hasAtLeastOneEventProcessed = false; > > try { > // If the scheduler allows to schedule, then try to consume an event > while (subtaskScheduler.schedule()) { > // If the event is consumed successfully, then continue to consume the > next event > // otherwise, stop consuming > if (!executeOnce()) { > break; > } > hasAtLeastOneEventProcessed = true; > } > } finally { > // Reset the scheduler to make sure that the scheduler can schedule again > subtaskScheduler.reset(); > } > > return hasAtLeastOneEventProcessed; > } > > It's always treated as successful operation. > > Not sure it should be this way. > > Chris > > Von: Christofer Dutz <[email protected]> > Datum: Dienstag, 13. August 2024 um 12:42 > An: [email protected] <[email protected]> > Betreff: When restarting an IoTDB with pre-existing pipeline subscriptions, > the system doesn't correctly startup > Hi all, > > I am using my Home-Automation experiment in parallel to my work in order to > learn and play with the cool features of IoTDB. > > Here I have built a little Application, that uses PLC4X to subscribe to my > KNX network and pump all data into an IoTDB instance running embedded. > > The thing I’m currently playing around with are the pipelines and the > subscription client. > > So, when I start the system for the first time all is good … however as soon > as I restart the system, IoTDB floods my logger with so many: > > 14015 [pool-60-IoTDB-Subscription-Executor-Pool-5] WARN > o.a.i.d.s.a.SubscriptionBrokerAgent - Subscription: broker bound to consumer > group [weatherServiceGroup] does not exist > > Messages, that I simply cannot spot the initial problem in my log. > > Is there anything special that needs to be done when starting a system with > pre-existing subscriptions? Is it also possible to define them as > “transient”? I don’t really need IoTDB to buffer results till my application > comes back up (All I want is the changed values whenever something changes). > > Chris >
