It seems all that is needed to avoid this problem, is to delete the return 
statement in line 141 … at least in my local version with this change I can 
re-start.

Chris

Von: Christofer Dutz <christofer.d...@c-ware.de>
Datum: Dienstag, 13. August 2024 um 13:43
An: dev@iotdb.apache.org <dev@iotdb.apache.org>
Betreff: AW: When restarting an IoTDB with pre-existing pipeline subscriptions, 
the system doesn't correctly startup
Ok … solved the mystery

It’s a bug.

In SubscriptionConsumerAgent. handleConsumerGroupMetaChanges there is a for 
loop to iterate over all subscriptions.
I can see that both are in there … however the code calls return inside the 
for-loop and hereby exists after the first successful subscription restore.
This should probably be a “continue” instead of a “return” statement.

Chris


Von: Christofer Dutz <christofer.d...@c-ware.de>
Datum: Dienstag, 13. August 2024 um 13:23
An: dev@iotdb.apache.org <dev@iotdb.apache.org>
Betreff: AW: When restarting an IoTDB with pre-existing pipeline subscriptions, 
the system doesn't correctly startup
Hi all,

I think I’ve spotted something … so in my application I’m adding two 
subscriptions. One for energy data and one for weather data …
The energyData subscription seems to be registered first and then the 
weatherData … if I restart the consumerGroupIdToSubscriptionBroker map only 
seems to contain the first registered group.

Could this possibly be the cause? After looking into the issue, indeed I could 
confirm that all warning messages seem to only relate to the weatherData and 
the energyData seems to work just fine.

Chris

Von: Christofer Dutz <christofer.d...@c-ware.de>
Datum: Dienstag, 13. August 2024 um 13:11
An: dev@iotdb.apache.org <dev@iotdb.apache.org>
Betreff: AW: When restarting an IoTDB with pre-existing pipeline subscriptions, 
the system doesn't correctly startup
Ok … using a full-text search and by setting breakpoints in every occasion that 
could produce the log message I narrowed it down to this code:

The first one is logged by this code:


public void bindPrefetchingQueue(final SubscriptionConnectorSubtask subtask) {
  final String consumerGroupId = subtask.getConsumerGroupId();
  final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
  if (Objects.isNull(broker)) {
    LOGGER.warn(
        "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
    return;
  }
  broker.bindPrefetchingQueue(subtask.getTopicName(), 
subtask.getInputPendingQueue());
}

After that all seem to be coming from here:

public void executePrefetch(final String consumerGroupId, final String 
topicName) {
  final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
  if (Objects.isNull(broker)) {
    LOGGER.warn(
        "Subscription: broker bound to consumer group [{}] does not exist", 
consumerGroupId);
    return;
  }
  broker.executePrefetch(topicName);
}

So, broker is always null and the method always directly returns … in 
PipeSubtask:


@Override
public Boolean call() throws Exception {
  boolean hasAtLeastOneEventProcessed = false;

  try {
    // If the scheduler allows to schedule, then try to consume an event
    while (subtaskScheduler.schedule()) {
      // If the event is consumed successfully, then continue to consume the 
next event
      // otherwise, stop consuming
      if (!executeOnce()) {
        break;
      }
      hasAtLeastOneEventProcessed = true;
    }
  } finally {
    // Reset the scheduler to make sure that the scheduler can schedule again
    subtaskScheduler.reset();
  }

  return hasAtLeastOneEventProcessed;
}

It's always treated as successful operation.

Not sure it should be this way.

Chris

Von: Christofer Dutz <christofer.d...@c-ware.de>
Datum: Dienstag, 13. August 2024 um 12:42
An: dev@iotdb.apache.org <dev@iotdb.apache.org>
Betreff: When restarting an IoTDB with pre-existing pipeline subscriptions, the 
system doesn't correctly startup
Hi all,

I am using my Home-Automation experiment in parallel to my work in order to 
learn and play with the cool features of IoTDB.

Here I have built a little Application, that uses PLC4X to subscribe to my KNX 
network and pump all data into an IoTDB instance running embedded.

The thing I’m currently playing around with are the pipelines and the 
subscription client.

So, when I start the system for the first time all is good … however as soon as 
I restart the system, IoTDB floods my logger with so many:

14015 [pool-60-IoTDB-Subscription-Executor-Pool-5] WARN  
o.a.i.d.s.a.SubscriptionBrokerAgent - Subscription: broker bound to consumer 
group [weatherServiceGroup] does not exist

Messages, that I simply cannot spot the initial problem in my log.

Is there anything special that needs to be done when starting a system with 
pre-existing subscriptions? Is it also possible to define them as “transient”? 
I don’t really need IoTDB to buffer results till my application comes back up 
(All I want is the changed values whenever something changes).

Chris

Reply via email to