I wanted to discuss whether some recent development work could be shared with the NIFI community. On my project we have developed a common schema format in JSON (which is later marshalled to a Java object) for multiple data sources to perform record processing. We also have a requirement that the schemas should be able to be tweaked without having to restart NIFI. We have developed a custom reader controller service, a custom writer controller service and a custom directory registry to periodically check for updates to schemas and the adding/removing of schemas. We have abstracted the core functionality for a directory based registry which could be extended to handle other custom formatted schemas. I would like to know if the abstracted directory registry would be useful for others and therefore worthy of contributing to Apache NIFI. Attached is the proposed abstracted directory registry class. Thank you!
package org.apache.nifi.registry;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.util.file.monitor.LastModifiedMonitor; import org.apache.nifi.util.file.monitor.SynchronousFileWatcher; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; /** * Class whose intention is to provide a directory based implementation of {@link SchemaRegistry}. * @param <T> Object representation of a schema stored in a given directory. */ public abstract class DirectorySchemaRegistry<T> extends AbstractControllerService implements SchemaRegistry { private static final Set<SchemaField> SCHEMA_FIELDS = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_NAME); /**{@link PropertyDescriptor} representing the directory where schemas are to be read from*/ public static final PropertyDescriptor SCHEMA_DIRECTORY = new PropertyDescriptor.Builder() .name("schema-directory") .displayName("Schema Directory") .description("Directory where schema files are stored.") .required(true) .addValidator(StandardValidators.createDirectoryExistsValidator(true, false)) .build(); /**{@link PropertyDescriptor} representing an interval when to check for schema updates on disk*/ public static final PropertyDescriptor RELOAD_INTERVAL = new PropertyDescriptor.Builder() .name("Reload Interval") .description("Allowed elapse time before checking for schema updates") .defaultValue("60 min") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); /**java.util.List of {@link PropertyDescriptor}*/ private static final List<PropertyDescriptor> SERVICE_PROPERTIES; /**{@link ConcurrentMap} representing cache of schemas*/ private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>(); static{ final List<PropertyDescriptor> props = new ArrayList<>(); props.add(SCHEMA_DIRECTORY); props.add(RELOAD_INTERVAL); SERVICE_PROPERTIES = Collections.unmodifiableList(props); } private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private SynchronousFileWatcher fileWatcher; private Path schemaDirectory; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return SERVICE_PROPERTIES; } @SuppressWarnings("RedundantThrows") @OnEnabled public void onConfigured(final ConfigurationContext context) throws InitializationException{ getLogger().trace("Entering onConfigured"); schemaDirectory = Paths.get(context.getProperty(SCHEMA_DIRECTORY).getValue()); getLogger().trace("Configured schema directory is {}", schemaDirectory); loadRecordSchemas(); fileWatcher = new SynchronousFileWatcher(schemaDirectory, new LastModifiedMonitor()); ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); FilesWatcherWorker reloadTask = new FilesWatcherWorker(); TimeUnit timeUnit = TimeUnit.MILLISECONDS; long reloadIntervalMilli = context.getProperty(RELOAD_INTERVAL).asTimePeriod(timeUnit); executor.scheduleWithFixedDelay(reloadTask, 0, reloadIntervalMilli, timeUnit); } /** * Method which loads all the schemas which are located in the specified directory. */ private void loadRecordSchemas() { getLogger().trace("Entering loadRecordSchemas"); List<T> schemas = new ArrayList<>(); lock.readLock().lock(); try { //NOTE: Files.list only get files on the same level it does not recursively look at the directory List<File> schemaPaths = Files.list(schemaDirectory) .map(Path::toFile) .filter(getSchemaFilter()) .collect(Collectors.toList()); getLogger().trace("Retrieved {} schema paths", schemaPaths.size()); /*NOTE: Streams are not used here since must handle the exception thrown by getSchema.*/ if(schemaPaths.size() > 0) { for (File path : schemaPaths) { schemas.add(getSchema(path)); } getLogger().trace("Completed transform to NIFI schema format"); } else { getLogger().warn("No schemas to transform"); } } catch(IOException ioe) { getLogger().error("Error occurred when trying to get schemas from {}, details:", schemaDirectory, ioe); } finally { lock.readLock().unlock(); } if(!schemas.isEmpty()) { //NOTE: Cache is cleared as the directory may have updates and/or new schemas. recordSchemas.clear(); schemas.forEach(schema -> recordSchemas.put(getSchemaName(schema), getRecordSchema(schema))); } getLogger().trace("Exiting loadRecordSchemas()"); } /** * java.util.function.Predicate to use to filter those files in the given directory * which represent schemas. * @return Instance of java.util.function.Predicate. */ abstract protected Predicate<File> getSchemaFilter(); /** * Method which loads a single instance of a schema. * @param path java.io.File referring to the schema on the file system. * @return Object representing the schema * @throws IOException If error occurs when attempting to retrieve schema from the file system. */ abstract protected T getSchema(File path) throws IOException; @Override public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { if(SCHEMA_DIRECTORY.equals(descriptor)) { schemaDirectory = Paths.get(newValue); loadRecordSchemas(); } } /** * Method which returns the name the schema goes by. * @param t Object representing the schema * @return Name the schema goes by. */ abstract protected String getSchemaName(T t); /** * Transforms the schema representation into an instance of {@link RecordSchema}. * @param t Object representing the schema * @return Instance of {@link RecordSchema}. */ abstract protected RecordSchema getRecordSchema(T t); @Override public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws SchemaNotFoundException { final Optional<String> schemaName = schemaIdentifier.getName(); if (schemaName.isPresent()) { return retrieveSchemaByName(schemaName.get()); } else { throw new SchemaNotFoundException("This Schema Registry only supports retrieving a schema by name."); } } /** * Method which retrieves the schema representation as an instance of {@link RecordSchema} * from this classes instance of {@link ConcurrentMap}. * @param schemaName Name associated with the schema. * @return Instance of {@link RecordSchema}. * @throws SchemaNotFoundException If the schema cannot be found in the underlying cache of schemas. */ private RecordSchema retrieveSchemaByName(final String schemaName) throws SchemaNotFoundException { final RecordSchema recordSchema = recordSchemas.get(schemaName); if (recordSchema == null) { throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'"); } return recordSchema; } @Override public Set<SchemaField> getSuppliedSchemaFields() { return SCHEMA_FIELDS; } private class FilesWatcherWorker implements Runnable { @Override public void run() { try{ if(fileWatcher.checkAndReset()){ loadRecordSchemas(); } } catch (IOException e) { getLogger().error("Failed to check file watcher!", e); } } } }