Github user brosander commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/51#discussion_r87245214 --- Diff: minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinator.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.minifi.bootstrap.configuration; + +import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder; +import org.apache.nifi.minifi.bootstrap.configuration.ingestors.interfaces.ChangeIngestor; +import org.apache.nifi.minifi.bootstrap.util.ByteBufferInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +public class ConfigurationChangeCoordinator implements Closeable, ConfigurationChangeNotifier { + + public static final String NOTIFIER_PROPERTY_PREFIX = "nifi.minifi.notifier"; + public static final String NOTIFIER_INGESTORS_KEY = NOTIFIER_PROPERTY_PREFIX + ".ingestors"; + private final static Logger logger = LoggerFactory.getLogger(ConfigurationChangeCoordinator.class); + private final Set<ConfigurationChangeListener> configurationChangeListeners = new HashSet<>(); + private final Set<ChangeIngestor> changeIngestors = new HashSet<>(); + + /** + * Provides an opportunity for the implementation to perform configuration and initialization based on properties received from the bootstrapping configuration + * + * @param properties from the bootstrap configuration + */ + public void initialize(Properties properties, ConfigurationFileHolder configurationFileHolder) { + final String ingestorsCsv = properties.getProperty(NOTIFIER_INGESTORS_KEY); + + if (ingestorsCsv != null && !ingestorsCsv.isEmpty()) { + for (String ingestorClassname : Arrays.asList(ingestorsCsv.split(","))) { + ingestorClassname = ingestorClassname.trim(); + try { + Class<?> ingestorClass = Class.forName(ingestorClassname); + ChangeIngestor changeIngestor = (ChangeIngestor) ingestorClass.newInstance(); + changeIngestor.initialize(properties, configurationFileHolder, this); + changeIngestors.add(changeIngestor); + logger.info("Initialized "); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + throw new RuntimeException("Issue instantiating ingestor " + ingestorClassname, e); + } + } + } + } + + /** + * Begins the associated notification service provided by the given implementation. In most implementations, no action will occur until this method is invoked. + */ + public void start() { + changeIngestors.forEach(ChangeIngestor::start); + } + + /** + * Provides an immutable collection of listeners for the notifier instance + * + * @return a collection of those listeners registered for notifications + */ + public Set<ConfigurationChangeListener> getChangeListeners() { + return Collections.unmodifiableSet(configurationChangeListeners); + } + + /** + * Adds a listener to be notified of configuration changes + * + * @param listener to be added to the collection + * @return true if the listener was added; false if already registered + */ + public boolean registerListener(ConfigurationChangeListener listener) { + return this.configurationChangeListeners.add(listener); + } + + /** + * Provide the mechanism by which listeners are notified + */ + public Collection<ListenerHandleResult> notifyListeners(ByteBuffer newConfig) { + logger.info("Notifying Listeners of a change"); + + Collection<ListenerHandleResult> listenerHandleResults = new ArrayList<>(configurationChangeListeners.size()); + for (final ConfigurationChangeListener listener : getChangeListeners()) { --- End diff -- Works for me
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---