I wanted to discuss whether some recent development work could be shared
with the NIFI community.
On my project we have developed a common schema format in JSON (which is
later marshalled to a Java object) for multiple data sources to
perform record processing.
We also have a requirement that the schemas should be able to be tweaked
without having to restart NIFI. We have developed a custom reader
controller service, a custom writer controller service and a custom
directory registry to periodically check for updates to schemas and the
adding/removing of schemas. We have abstracted  the core functionality for
a directory based registry which could be extended to handle other custom
formatted schemas. I would like to know if the abstracted directory
registry would be useful for others and therefore worthy of contributing to
Apache NIFI.
Attached is the proposed  abstracted directory registry class.
Thank you!
package org.apache.nifi.registry;

import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
 * Class whose intention is to provide a directory based implementation of {@link SchemaRegistry}.
 * @param <T> Object representation of a schema stored in a given directory.
 */
public abstract class DirectorySchemaRegistry<T> extends AbstractControllerService implements SchemaRegistry {

    private static final Set<SchemaField> SCHEMA_FIELDS =
            EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_NAME);

    /**{@link PropertyDescriptor} representing the directory where schemas are to be read from*/
    public static final PropertyDescriptor SCHEMA_DIRECTORY = new PropertyDescriptor.Builder()
            .name("schema-directory")
            .displayName("Schema Directory")
            .description("Directory where schema files are stored.")
            .required(true)
            .addValidator(StandardValidators.createDirectoryExistsValidator(true, false))
            .build();

    /**{@link PropertyDescriptor} representing an interval when to check for schema updates on disk*/
    public static final PropertyDescriptor RELOAD_INTERVAL = new PropertyDescriptor.Builder()
            .name("Reload Interval")
            .description("Allowed elapse time before checking for schema updates")
            .defaultValue("60 min")
            .required(true)
            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
            .build();

    /**java.util.List of {@link PropertyDescriptor}*/
    private static final List<PropertyDescriptor> SERVICE_PROPERTIES;

    /**{@link ConcurrentMap} representing cache of schemas*/
    private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>();

    static{
        final List<PropertyDescriptor> props = new ArrayList<>();
        props.add(SCHEMA_DIRECTORY);
        props.add(RELOAD_INTERVAL);
        SERVICE_PROPERTIES = Collections.unmodifiableList(props);
    }

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private SynchronousFileWatcher fileWatcher;
    private Path schemaDirectory;

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return SERVICE_PROPERTIES;
    }

    @SuppressWarnings("RedundantThrows")
    @OnEnabled
    public void onConfigured(final ConfigurationContext context) throws InitializationException{
        getLogger().trace("Entering onConfigured");
        schemaDirectory = Paths.get(context.getProperty(SCHEMA_DIRECTORY).getValue());
        getLogger().trace("Configured schema directory is {}", schemaDirectory);
        loadRecordSchemas();

        fileWatcher = new SynchronousFileWatcher(schemaDirectory, new LastModifiedMonitor());
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        FilesWatcherWorker reloadTask = new FilesWatcherWorker();
        TimeUnit timeUnit = TimeUnit.MILLISECONDS;
        long reloadIntervalMilli = context.getProperty(RELOAD_INTERVAL).asTimePeriod(timeUnit);
        executor.scheduleWithFixedDelay(reloadTask, 0, reloadIntervalMilli, timeUnit);
    }

    /**
     * Method which loads all the schemas which are located in the specified directory.
     */
    private void loadRecordSchemas() {
        getLogger().trace("Entering loadRecordSchemas");
        List<T> schemas = new ArrayList<>();
        lock.readLock().lock();
        try {
            //NOTE: Files.list only get files on the same level it does not recursively look at the directory
            List<File> schemaPaths = Files.list(schemaDirectory)
                    .map(Path::toFile)
                    .filter(getSchemaFilter())
                    .collect(Collectors.toList());
            getLogger().trace("Retrieved {} schema paths", schemaPaths.size());
            /*NOTE: Streams are not used here since must handle the exception thrown by getSchema.*/
            if(schemaPaths.size() > 0) {
                for (File path : schemaPaths) {
                    schemas.add(getSchema(path));
                }
                getLogger().trace("Completed transform to NIFI schema format");
            } else {
                getLogger().warn("No schemas to transform");
            }
        }
        catch(IOException ioe) {
            getLogger().error("Error occurred when trying to get schemas from {}, details:",
                    schemaDirectory, ioe);
        }
        finally {
            lock.readLock().unlock();
        }

        if(!schemas.isEmpty()) {
            //NOTE: Cache is cleared as the directory may have updates and/or new schemas.
            recordSchemas.clear();
            schemas.forEach(schema -> recordSchemas.put(getSchemaName(schema), getRecordSchema(schema)));
        }
        getLogger().trace("Exiting loadRecordSchemas()");
    }

    /**
     * java.util.function.Predicate to use to filter those files in the given directory
     * which represent schemas.
     * @return Instance of java.util.function.Predicate.
     */
    abstract protected Predicate<File> getSchemaFilter();

    /**
     * Method which loads a single instance of a schema.
     * @param path java.io.File referring to the schema on the file system.
     * @return Object representing the schema
     * @throws IOException If error occurs when attempting to retrieve schema from the file system.
     */
    abstract protected T getSchema(File path) throws IOException;

    @Override
    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if(SCHEMA_DIRECTORY.equals(descriptor)) {
            schemaDirectory = Paths.get(newValue);
            loadRecordSchemas();
        }
    }

    /**
     * Method which returns the name the schema goes by.
     * @param t Object representing the schema
     * @return Name the schema goes by.
     */
    abstract protected String getSchemaName(T t);

    /**
     * Transforms the schema representation into an instance of {@link RecordSchema}.
     * @param t Object representing the schema
     * @return Instance of {@link RecordSchema}.
     */
    abstract protected RecordSchema getRecordSchema(T t);

    @Override
    public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException {
        final Optional<String> schemaName = schemaIdentifier.getName();
        if (schemaName.isPresent()) {
            return retrieveSchemaByName(schemaName.get());
        } else {
            throw new SchemaNotFoundException("This Schema Registry only supports retrieving a schema by name.");
        }
    }

    /**
     * Method which retrieves the schema representation as an instance of {@link RecordSchema}
     * from this classes instance of {@link ConcurrentMap}.
     * @param schemaName Name associated with the schema.
     * @return Instance of {@link RecordSchema}.
     * @throws SchemaNotFoundException If the schema cannot be found in the underlying cache of schemas.
     */
    private RecordSchema retrieveSchemaByName(final String schemaName) throws SchemaNotFoundException {
        final RecordSchema recordSchema = recordSchemas.get(schemaName);
        if (recordSchema == null) {
            throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'");
        }
        return recordSchema;
    }

    @Override
    public Set<SchemaField> getSuppliedSchemaFields() {
        return SCHEMA_FIELDS;
    }

    private class FilesWatcherWorker implements Runnable {
        @Override
        public void run() {
            try{
                if(fileWatcher.checkAndReset()){
                  loadRecordSchemas();
                }
            } catch (IOException e) {
                getLogger().error("Failed to check file watcher!", e);
            }
        }
    }
}

Reply via email to