Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/1997#discussion_r165032066 --- Diff: nifi-nar-bundles/nifi-simulator-bundle/nifi-simulator-processors/src/main/java/com/apache/nifi/processors/simulator/GenerateTimeSeriesFlowFile.java --- @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.apache.nifi.processors.simulator; + +import be.cetic.tsimulus.config.Configuration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.joda.time.LocalDateTime; +import scala.Some; +import scala.Tuple3; +import scala.collection.JavaConverters; + +import java.util.List; +import java.util.Set; +import java.util.Collections; +import java.util.HashSet; +import java.util.ArrayList; + +@Tags({"Simulator, Timeseries, IOT, Testing"}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@CapabilityDescription("Generates realistic time series data using the TSimulus time series generator, and places the values into the flowfile in a CSV format.") +public class GenerateTimeSeriesFlowFile extends AbstractProcessor { + + private Configuration simConfig = null; + private boolean isTest = false; + + public static final PropertyDescriptor SIMULATOR_CONFIG = new PropertyDescriptor + .Builder().name("SIMULATOR_CONFIG") + .displayName("Simulator Configuration File") + .description("The JSON configuration file to use to configure TSimulus") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor PRINT_HEADER = new PropertyDescriptor + .Builder().name("PRINT_HEADER") + .displayName("Print Header") + .description("Directs the processor whether to print a header line or not.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + public static final Relationship SUCCESS = new Relationship.Builder() + .name("Success") + .description("When the flowfile is successfully generated") + .build(); + + private List<PropertyDescriptor> descriptors; + + private Set<Relationship> relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + + final List<PropertyDescriptor> descriptors = new ArrayList<>(); + + descriptors.add(SIMULATOR_CONFIG); + descriptors.add(PRINT_HEADER); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set<Relationship> getRelationships() { + return this.relationships; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (SIMULATOR_CONFIG.equals(descriptor)) + simConfig = null; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + loadConfiguration(context.getProperty(SIMULATOR_CONFIG).getValue()); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + ComponentLog logger = getLogger(); + FlowFile flowFile = session.get(); + + // Create the flowfile, as it probably does not exist + if (flowFile == null) + flowFile = session.create(); + + // Get the data + String data = generateData(context.getProperty(PRINT_HEADER).asBoolean()); + + // Write the results back out to flow file + try{ + flowFile = session.write(flowFile, out -> out.write(data.getBytes())); + session.getProvenanceReporter().create(flowFile); + session.transfer(flowFile, SUCCESS); + } catch (ProcessException ex) { + logger.error("Unable to write generated data out to flowfile. Error: ", ex); + } + } + + // Loads the configuration from the file + private void loadConfiguration(String fileName){ + if (simConfig == null){ + // Load the simulator configuration + if (fileName.contains("/configs/unitTestConfig.json")) + isTest = true; + try{ + simConfig = SimController.getConfiguration(fileName); + }catch (Exception ex){ + getLogger().error("Error loading configuration: " + ex.getMessage()); + throw ex; + } + + } + } + + // Actually do the data generation via TSimulus + private String generateData(boolean printHeader){ + LocalDateTime queryTime = LocalDateTime.now(); + if(isTest) + queryTime = LocalDateTime.parse("2016-01-01T00:00:00.000"); + + // Get the time Values for the current time + scala.collection.Iterable<Tuple3<String, LocalDateTime, Object>> data = SimController.getTimeValue(simConfig.timeSeries(), queryTime); + + // Convert the Scala Iterable to a Java one + Iterable<Tuple3<String, LocalDateTime, Object>> generatedValues = JavaConverters.asJavaIterableConverter(data).asJava(); + + // Build the flow file string + StringBuilder dataValueString = new StringBuilder(); + + if (printHeader) --- End diff -- Please wrap in curly brackets.
---