http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java new file mode 100644 index 0000000..9058cf4 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -0,0 +1,566 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import javax.script.ScriptException; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.io.BufferedOutputStream; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.annotation.CapabilityDescription; +import org.apache.nifi.processor.annotation.EventDriven; +import org.apache.nifi.processor.annotation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.scripting.ConverterScript; +import org.apache.nifi.scripting.ReaderScript; +import org.apache.nifi.scripting.Script; +import org.apache.nifi.scripting.ScriptFactory; +import org.apache.nifi.scripting.WriterScript; + +/** + * <!-- Processor Documentation ================================================== --> + * <h2>Description:</h2> + * <p> + * This processor provides the capability to execute scripts in various + * scripting languages, and passes into the scripts the input stream and output + * stream(s) representing an incoming flow file and any created flow files. The + * processor is designed to be thread safe, so multiple concurrent tasks may + * execute against a single script. The processor provides a framework which + * enables script writers to implement 3 different types of scripts: + * <ul> + * ReaderScript - which enables stream-based reading of a FlowFile's + * content</br> WriterScript - which enables stream-based reading and + * writing/modifying of a FlowFile's content</br> ConverterScript - which + * enables stream-based reading a FlowFile's content and stream-based writing to + * newly created FlowFiles</br> + * </ul> + * Presently, the processor supports 3 scripting languages: Ruby, Python, and + * JavaScript. The processor is built on the javax.script API which enables + * ScriptEngine discovery, thread management, and encapsulates much of the low + * level bridging-code that enables Java to Script language integration. Thus, + * it is designed to be easily extended to other scripting languages. </br> The + * attributes of a FlowFile and properties of the Processor are exposed to the + * script by either a variable in the base class or a getter method. A script + * may declare new Processor Properties and different Relationships via + * overriding the getPropertyDescriptors and getRelationships methods, + * respectively. + * </p> + * <p> + * <strong>Properties:</strong> + * </p> + * <p> + * In the list below, the names of required properties appear in bold. Any other + * properties (not in bold) are considered optional. If a property has a default + * value, it is indicated. If a property supports the use of the NiFi Expression + * Language (or simply, "expression language"), that is also indicated. Of + * particular note: This processor allows scripts to define additional Processor + * properties, which will not be initially visible. Once the processor's + * configuration is validated, script defined properties will become visible, + * and may affect the validity of the processor. + * </p> + * <ul> + * <li> + * <strong>Script File Name</strong> + * <ul> + * <li>Script location, can be relative or absolute path.</li> + * <li>Default value: no default</li> + * <li>Supports expression language: false</li> + * </ul> + * </li> + * <li> + * <strong>Script Check Interval</strong> + * <ul> + * <li>The time period between checking for updates to a script.</li> + * <li>Default value: 15 sec</li> + * <li>Supports expression language: false</li> + * </ul> + * </li> + * </ul> + * + * <p> + * <strong>Relationships:</strong> + * </p> + * <p> + * The initial 'out of the box' relationships are below. Of particular note is + * the ability of a script to change the set of relationships. However, any + * relationships defined by the script will not be visible until the processor's + * configuration has been validated. Once done, new relationships will become + * visible. + * </p> + * <ul> + * <li> + * success + * <ul> + * <li>Used when a file is successfully processed by a script.</li> + * </ul> + * </li> + * <li> + * failure + * <ul> + * <li>Used when an error occurs while processing a file with a script.</li> + * </ul> + * </li> + * </ul> + * + * <p> + * <strong>Example Scripts:</strong> + * </p> + * <ul> + * JavaScript example - the 'with' statement imports packages defined in the + * framework. Since the 'instance' variable is intended to be local scope (not + * global), it must be named 'instance' as it it not passed back to the + * processor upon script evaluation and must be fetched. If you make it global, + * you can name it whatever you'd like...but this is intended to be + * multi-threaded so do so at your own risk. Presently, there are issues with + * the JavaScript scripting engine that prevent sub-classing the base classes in + * the Processor's Java framework. So, what is actually happening is an instance + * of the ReaderScript is created with a provided callback object. When we are + * able to move to a more competent scripting engine, the code below will remain + * the same, but the 'instance' variable will actually be a sub-class of + * ReaderScript. + * + * <pre> + * with (Scripting) { + * var instance = new ReaderScript({ + * route : function(input) { + * var str = IOUtils.toString(input); + * var expr = instance.getProperty("expr"); + * filename = instance.attributes.get("filename"); + * instance.setAttribute("filename", filename + ".modified"); + * if (str.match(expr)) { + * return Script.FAIL_RELATIONSHIP; + * } else { + * return Script.SUCCESS_RELATIONSHIP; + * } + * } + * }); + * } + * </pre> + * + * Ruby example - the 'OutputStreamHandler' is an interface which is called when + * creating flow files. + * + * <pre> + * java_import 'org.apache.nifi.scripting.OutputStreamHandler' + * class SimpleConverter < ConverterScript + * field_reader :FAIL_RELATIONSHIP, :SUCCESS_RELATIONSHIP, :logger, :attributes + * + * def convert(input) + * in_io = input.to_io + * createFlowFile("firstLine", FAIL_RELATIONSHIP, OutputStreamHandler.impl do |method, out| + * out_io = out.to_io + * out_io << in_io.readline.to_java_bytes + * out_io.close + * logger.debug("Wrote data to failure...this message logged with logger from super class") + * end) + * + * createFlowFile("otherLines", SUCCESS_RELATIONSHIP, OutputStreamHandler.impl do |method, out| + * out_io = out.to_io + * in_io.each_line { |line| + * out_io << line + * } + * out_io.close + * logger.debug("Wrote data to success...this message logged with logger from super class") + * end) + * in_io.close + * end + * + * end + * + * $logger.debug("Creating SimpleConverter...this message logged with logger from shared variables") + * SimpleConverter.new + * </pre> + * + * Python example - The difficulty with Python is that it does not return + * objects upon script evaluation, so the instance of the Script class must be + * fetched by name. Thus, you must define a variable called 'instance'. + * + * <pre> + * import re + * + * class RoutingReader(ReaderScript): + * A = Relationship.Builder().name("a").description("some good stuff").build() + * B = Relationship.Builder().name("b").description("some other stuff").build() + * C = Relationship.Builder().name("c").description("some bad stuff").build() + * + * def getRelationships(self): + * return [self.A,self.B,self.C] + * + * def getExceptionRoute(self): + * return self.C + * + * def route( self, input ): + * for line in FileUtil.wrap(input): + * if re.match("^bad", line, re.IGNORECASE): + * return self.B + * if re.match("^sed", line): + * raise RuntimeError("That's no good!") + * + * return self.A + * + * instance = RoutingReader() + * </pre> + * + * </ul> + * <p> + * <strong>Shared Variables</strong> + * </p> + * <ul> + * <li>logger : global scope</li> + * <li>properties : local/instance scope</li> + * </ul> + * <p> + * <strong>Script API:</strong> + * </p> + * <ul> + * <li>getAttribute(String) : String</li> + * <li>getAttributes() : Map(String,String)</li> + * <li>getExceptionRoute() : Relationship</li> + * <li>getFileName() : String</li> + * <li>getFlowFileEntryDate() : Calendar</li> + * <li>getFlowFileSize() : long</li> + * <li>getProperties() : Map(String, String)</li> + * <li>getProperty(String) : String</li> + * <li>getPropertyDescriptors() : List(PropertyDescriptor)</li> + * <li>getRelationships() : Collection(Relationship)</li> + * <li>getRoute() : Relationship</li> + * <li>setRoute(Relationship)</li> + * <li>setAttribute(String, String)</li> + * <li>validate() : Collection(String)</li> + * </ul> + * <p> + * <strong>ReaderScript API:</strong> + * </p> + * <ul> + * <li>route(InputStream) : Relationship</li> + * </ul> + * <p> + * <strong>WriterScript API:</strong> + * </p> + * <ul> + * <li>process(InputStream, OutputStream)</li> + * </ul> + * <p> + * <strong>ConverterScript API:</strong> + * </p> + * <ul> + * <li>convert(InputStream)</li> + * <li>createFlowFile(String, Relationship, OutputStreamHandler)</li> + * </ul> + * <p> + * <strong>OutputStreamHandler API:</strong> + * </p> + * <ul> + * <li>write(OutputStream)</li> + * </ul> + */ +@EventDriven +@Tags({"script", "ruby", "python", "javascript", "execute"}) +@CapabilityDescription("Execute scripts in various scripting languages, and passes into the scripts the input stream and output stream(s) " + + "representing an incoming flow file and any created flow files.") +public class ExecuteScript extends AbstractProcessor { + + private final AtomicBoolean doCustomValidate = new AtomicBoolean(true); + private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>(); + private final AtomicReference<List<PropertyDescriptor>> propertyDescriptors = new AtomicReference<>(); + private volatile ScriptFactory scriptFactory; + private volatile Relationship exceptionRoute; + + /** + * Script location, can be relative or absolute path -- passed as-is to + * {@link File#File(String) File constructor} + */ + public static final PropertyDescriptor SCRIPT_FILE_NAME = new PropertyDescriptor.Builder() + .name("Script File Name") + .description("Script location, can be relative or absolute path") + .required(true) + .addValidator(new Validator() { + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR.validate(subject, input, context); + if (result.isValid()) { + int dotPos = input.lastIndexOf('.'); + if (dotPos < 1) { + result = new ValidationResult.Builder() + .subject(subject) + .valid(false) + .explanation("Filename must have an extension") + .input(input) + .build(); + } + } + return result; + } + }) + .build(); + + static final PropertyDescriptor SCRIPT_CHECK_INTERVAL = new PropertyDescriptor.Builder() + .name("Script Check Interval") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .description("The time period between checking for updates to a script") + .required(true) + .defaultValue("15 sec") + .build(); + + @Override + protected void init(ProcessorInitializationContext context) { + Set<Relationship> empty = Collections.emptySet(); + relationships.set(empty); + ArrayList<PropertyDescriptor> propDescs = new ArrayList<>(); + propDescs.add(SCRIPT_FILE_NAME); + propDescs.add(SCRIPT_CHECK_INTERVAL); + propertyDescriptors.set(Collections.unmodifiableList(propDescs)); + scriptFactory = new ScriptFactory(getLogger()); + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return propertyDescriptors.get(); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .dynamic(true) + .addValidator(Validator.VALID) + .build(); + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + doCustomValidate.set(true); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships.get(); + } + + /** + * Called by framework. + * + * Returns a list of reasons why this processor cannot be run. + * @return + */ + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + if (doCustomValidate.getAndSet(false)) { + long interval = validationContext.getProperty(SCRIPT_CHECK_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS); + scriptFactory.setScriptCheckIntervalMS(interval); + List<ValidationResult> results = new ArrayList<>(); + String file = validationContext.getProperty(SCRIPT_FILE_NAME).getValue(); + try { + Script s = scriptFactory.getScript(file); + + // set the relationships of the processor + relationships.set(new HashSet<>(s.getRelationships())); + + // need to get script's prop. descs. and validate. May, or may not, have dynamic + // props already...depends if this is the first time the processor is being configured. + Map<PropertyDescriptor, String> properties = validationContext.getProperties(); + + // need to compare props, if any, against script-expected props that are required. + // script may be expecting required props that are not known, or some props may have invalid + // values. + // processor may be configured with dynamic props that the script will use...but does not declare which would + // be a bad thing + List<PropertyDescriptor> scriptPropDescs = s.getPropertyDescriptors(); + getLogger().debug("Script is {}", new Object[]{s}); + getLogger().debug("Script file name is {}", new Object[]{s.getFileName()}); + getLogger().debug("Script Prop Descs are: {}", new Object[]{scriptPropDescs.toString()}); + getLogger().debug("Thread is: {}", new Object[]{Thread.currentThread().toString()}); + for (PropertyDescriptor propDesc : scriptPropDescs) { + // need to check for missing props + if (propDesc.isRequired() && !properties.containsKey(propDesc)) { + results.add(new ValidationResult.Builder() + .subject("Script Properties") + .valid(false) + .explanation("Missing Property " + propDesc.getName()) + .build()); + + // need to validate current value against script provided validator + } else if (properties.containsKey(propDesc)) { + String value = properties.get(propDesc); + ValidationResult result = propDesc.validate(value, validationContext); + if (!result.isValid()) { + results.add(result); + } + } // else it is an optional prop according to the script and it is not specified by + // the configuration of the processor + } + + // need to update the known prop desc's with what we just got from the script + List<PropertyDescriptor> pds = new ArrayList<>(propertyDescriptors.get()); + pds.addAll(scriptPropDescs); + propertyDescriptors.set(Collections.unmodifiableList(pds)); + + if (results.isEmpty()) { + // so needed props are supplied and individually validated, now validate script + Collection<String> reasons; + reasons = s.validate(); + if (null == reasons) { + getLogger().warn("Script had invalid return value for validate(), ignoring."); + } else { + for (String reason : reasons) { + ValidationResult result = new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation(reason) + .build(); + results.add(result); + } + } + } + + // get the exception route + exceptionRoute = s.getExceptionRoute(); + + return results; + } catch (ScriptException | IOException | NoSuchMethodException e) { + doCustomValidate.set(true); + results.add(new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation("Cannot create script due to " + e.getMessage()) + .input(file) + .build()); + getLogger().error("Cannot create script due to " + e, e); + return results; + } + } + return null; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; // fail-fast if there is no work to do + } + + final String scriptFileName = context.getProperty(SCRIPT_FILE_NAME).getValue(); + // doing this cloning because getProperties does not initialize props that have only their default values + // must do a getProperty for that value to be initialized + Map<String, String> props = new HashMap<>(); + for (PropertyDescriptor propDesc : context.getProperties().keySet()) { + if (propDesc.isExpressionLanguageSupported()) { + props.put(propDesc.getName(), context.getProperty(propDesc).evaluateAttributeExpressions(flowFile).getValue()); + } else { + props.put(propDesc.getName(), context.getProperty(propDesc).getValue()); + } + } + Script script = null; + try { + final Script finalScript = scriptFactory.getScript(scriptFileName, props, flowFile); + script = finalScript; + if (finalScript instanceof ReaderScript) { + session.read(flowFile, new InputStreamCallback() { + + @Override + public void process(InputStream in) throws IOException { + try { + ((ReaderScript) finalScript).process(new BufferedInputStream(in)); + } catch (NoSuchMethodException | ScriptException e) { + getLogger().error("Failed to execute ReaderScript", e); + throw new IOException(e); + } + } + }); + } else if (finalScript instanceof WriterScript) { + flowFile = session.write(flowFile, new StreamCallback() { + + @Override + public void process(InputStream in, OutputStream out) throws IOException { + try { + ((WriterScript) finalScript).process(new BufferedInputStream(in), new BufferedOutputStream(out)); + out.flush(); + } catch (NoSuchMethodException | ScriptException e) { + getLogger().error("Failed to execute WriterScript", e); + throw new IOException(e); + } + } + }); + } else if (finalScript instanceof ConverterScript) { + ((ConverterScript) finalScript).process(session); + + // Note that these scripts don't pass the incoming FF through, + // they always create new outputs + session.remove(flowFile); + return; + } else { + // only thing we can do is assume script has already run and done it's thing, so just transfer the incoming + // flowfile + getLogger().debug("Successfully executed script from {}", new Object[]{scriptFileName}); + } + + // update flow file attributes + flowFile = session.putAllAttributes(flowFile, finalScript.getAttributes()); + Relationship route = finalScript.getRoute(); + if (null == route) { + session.remove(flowFile); + getLogger().info("Removing flowfile {}", new Object[]{flowFile}); + } else { + session.transfer(flowFile, route); + getLogger().info("Transferring flowfile {} to {}", new Object[]{flowFile, route}); + } + } catch (ScriptException | IOException e) { + getLogger().error("Failed to create script from {} with flowFile {}. Rolling back session.", + new Object[]{scriptFileName, flowFile}, e); + throw new ProcessException(e); + } catch (Exception e) { + if (null != script) { + getLogger().error("Failed to execute script from {}. Transferring flow file {} to {}", + new Object[]{scriptFileName, flowFile, exceptionRoute}, e); + session.transfer(flowFile, exceptionRoute); + } else { + getLogger().error("Failed to execute script from {} with flowFile {}. Rolling back session", + new Object[]{scriptFileName, flowFile}, e); + throw new ProcessException(e); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java new file mode 100644 index 0000000..7be47a8 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ConverterScript.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import javax.script.Invocable; +import javax.script.ScriptException; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; + +/** + * <p> + * Script authors should extend this class if they want to perform complex + * conversions in a NiFi processor. + * </p> + * + * <p> + * Scripts must implement {@link #convert(FileInputStream)}. This method may + * create new FlowFiles and pass them to one or more routes. The input FlowFile + * will be removed from the repository after execution of this method completes. + * </p> + * + * <p> + * In general, the {@link #convert(FileInputStream)} will read from the supplied + * stream, then create one or more output sinks and route the result to the + * relationship of choice using + * {@link #routeStream(ByteArrayOutputStream, String, String)} or + * {@link #routeBytes(byte[], String, String)}. + * + * <p> + * Implement {@link #getProcessorRelationships()} to allow writing to + * relationships other than <code>success</code> and <code>failure</code>. The + * {@link #getRoute()} superclass method is *not* used by Converter Scripts. + * </p> + * + */ +public class ConverterScript extends Script { + + private ProcessSession session; // used to create files + private Object convertCallback; + + public ConverterScript() { + + } + + public ConverterScript(Object... callbacks) { + super(callbacks); + for (Object callback : callbacks) { + if (callback instanceof Map<?, ?>) { + convertCallback = convertCallback == null && ((Map<?, ?>) callback).containsKey("convert") ? callback : convertCallback; + } + } + } + + // Subclasses should implement this to define basic logic + protected void convert(InputStream stream) throws NoSuchMethodException, ScriptException { + if (convertCallback != null) { + ((Invocable) engine).invokeMethod(convertCallback, "convert", stream); + } + } + + /** + * Owning processor uses this method to kick off handling of a single file + * + * @param aSession the owning processor's Repository (needed to make new + * files) + */ + public void process(ProcessSession aSession) { + this.session = aSession; + this.session.read(this.flowFile, new InputStreamCallback() { + + @Override + public void process(InputStream in) throws IOException { + BufferedInputStream stream = new BufferedInputStream(in); + try { + convert(stream); + } catch (NoSuchMethodException | ScriptException e) { + logger.error("Failed to execute 'convert' function in script", e); + throw new IOException(e); + } + } + }); + } + + // this should go back to protected once we get Nashorn + public void createFlowFile(final String flowFileName, final Relationship relationship, final OutputStreamHandler handler) { + FlowFile result = session.create(this.flowFile); + result = session.putAttribute(result, CoreAttributes.FILENAME.key(), flowFileName); + try { + result = session.write(result, new OutputStreamCallback() { + + @Override + public void process(OutputStream out) throws IOException { + handler.write(out); + } + }); + this.logger.info("Transfer flow file {} to {}", new Object[]{result, relationship}); + session.transfer(result, relationship); + } catch (Exception e) { + this.logger.error("Could not create new flow file from script", e); + session.remove(result); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java new file mode 100644 index 0000000..883b688 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JRubyScriptFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; + +public enum JRubyScriptFactory { + + INSTANCE; + + private static final String PRELOADS = "include Java\n" + + "\n" + + "java_import 'org.apache.nifi.components.PropertyDescriptor'\n" + + "java_import 'org.apache.nifi.components.Validator'\n" + + "java_import 'org.apache.nifi.processor.util.StandardValidators'\n" + + "java_import 'org.apache.nifi.processor.Relationship'\n" + + "java_import 'org.apache.nifi.logging.ProcessorLog'\n" + + "java_import 'org.apache.nifi.scripting.ReaderScript'\n" + + "java_import 'org.apache.nifi.scripting.WriterScript'\n" + + "java_import 'org.apache.nifi.scripting.ConverterScript'\n" + + "\n"; + + public String getScript(File scriptFile) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append(PRELOADS) + .append(FileUtils.readFileToString(scriptFile, "UTF-8")); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java new file mode 100644 index 0000000..774fb1f --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JavaScriptScriptFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; + +public enum JavaScriptScriptFactory { + + INSTANCE; + + private static final String PRELOADS = "var Scripting = JavaImporter(\n" + + " Packages.org.apache.nifi.components,\n" + + " Packages.org.apache.nifi.processor.util,\n" + + " Packages.org.apache.nifi.processor,\n" + + " Packages.org.apache.nifi.logging,\n" + + " Packages.org.apache.nifi.scripting,\n" + + " Packages.org.apache.commons.io);\n" + + "var readFile = function (file) {\n" + + " var script = Packages.org.apache.commons.io.FileUtils.readFileToString(" + + " new java.io.File($PATH, file)" + + " );\n" + + " return \"\" + script;\n" + + "}\n" + + "var require = function (file){\n" + + " var exports={}, module={};\n" + + " module.__defineGetter__('id', function(){return file;});" + + " eval(readFile(file));\n" + + " return exports;\n" + + "}\n"; + + public String getScript(File scriptFile) throws IOException { + StringBuilder sb = new StringBuilder(); + final String parent = StringUtils.replace(scriptFile.getParent(), "\\", "/"); + sb.append(PRELOADS).append("var $PATH = \"").append(parent).append("\"\n") + .append(FileUtils.readFileToString(scriptFile, "UTF-8")); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java new file mode 100644 index 0000000..6b40b5e --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/JythonScriptFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; + +public enum JythonScriptFactory { + + INSTANCE; + + private final static String PRELOADS = "from org.python.core.util import FileUtil\n" + + "from org.apache.nifi.components import PropertyDescriptor\n" + + "from org.apache.nifi.components import Validator\n" + + "from org.apache.nifi.processor.util import StandardValidators\n" + + "from org.apache.nifi.processor import Relationship\n" + + "from org.apache.nifi.logging import ProcessorLog\n" + + "from org.apache.nifi.scripting import ReaderScript\n" + + "from org.apache.nifi.scripting import WriterScript\n" + + "from org.apache.nifi.scripting import ConverterScript\n"; + + public String getScript(File scriptFile) throws IOException { + StringBuilder sb = new StringBuilder(); + sb.append(PRELOADS) + .append(FileUtils.readFileToString(scriptFile, "UTF-8")); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java new file mode 100644 index 0000000..d879722 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.OutputStream; + +public interface OutputStreamHandler { + + void write(OutputStream out); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java new file mode 100644 index 0000000..b1d89c0 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.InputStream; +import java.util.Map; + +import javax.script.Invocable; +import javax.script.ScriptException; + +import org.apache.nifi.processor.Relationship; + +/** + * <p> + * Script authors should extend this class if they want to follow the "reader" + * paradigm for NiFi processors. + * </p> + * + * <p> + * User scripts should implement {@link #route(InputStream)}. <code>route</code> + * uses a returned relationship name to determine where FlowFiles go. Scripts + * may also implement {@link #getProcessorRelationships()} to specify available + * relationship names. + * </p> + * + */ +public class ReaderScript extends Script { + + private Object routeCallback; + + public ReaderScript(Object... callbacks) { + super(callbacks); + for (Object callback : callbacks) { + if (callback instanceof Map<?, ?>) { + routeCallback = routeCallback == null && ((Map<?, ?>) callback).containsKey("route") ? callback : routeCallback; + } + } + } + + public ReaderScript() { + + } + + // Simple helper + public void process(InputStream input) throws NoSuchMethodException, ScriptException { + lastRoute = route(input); + } + + /** + * Subclasses should examine the provided inputstream, then determine which + * relationship the file will be sent down and return its name. + * + * + * @param in a Java InputStream containing the incoming FlowFile. + * @return a relationship name + * @throws ScriptException + * @throws NoSuchMethodException + */ + public Relationship route(InputStream in) throws NoSuchMethodException, ScriptException { + Relationship relationship = null; + Invocable invocable = (Invocable) this.engine; + relationship = (Relationship) invocable.invokeMethod(routeCallback, "route", in); + return relationship; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java new file mode 100644 index 0000000..786f541 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.script.Invocable; +import javax.script.ScriptEngine; +import javax.script.ScriptException; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.Relationship; + +/** + * <p> + * Base class for all scripts. In this framework, only ScriptEngines that + * implement javax.script.Invocable are supported. + * + * </p> + * + */ +public class Script { + + public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder() + .name("success") + .description("Destination of successfully created flow files") + .build(); + public static final Relationship FAIL_RELATIONSHIP = new Relationship.Builder() + .name("failure") + .description("Destination of flow files when a error occurs in the script") + .build(); + + static final Set<Relationship> RELATIONSHIPS; + + static { + Set<Relationship> rels = new HashSet<>(); + rels.add(FAIL_RELATIONSHIP); + rels.add(SUCCESS_RELATIONSHIP); + RELATIONSHIPS = Collections.unmodifiableSet(rels); + } + + FlowFile flowFile = null; + ScriptEngine engine = null; + + protected Map<String, String> properties = new HashMap<>(); + protected Relationship lastRoute = SUCCESS_RELATIONSHIP; + protected ProcessorLog logger; + protected String scriptFileName; + protected Map<String, String> attributes = new HashMap<>(); + protected long flowFileSize = 0; + protected long flowFileEntryDate = System.currentTimeMillis(); + + // the following are needed due to an inadequate JavaScript ScriptEngine. It will not allow + // subclassing a Java Class, only implementing a Java Interface. So, the syntax of JavaScript + // scripts looks like subclassing, but actually is just constructing a Script instance and + // passing in functions as args to the constructor. When we move to Nashorn JavaScript ScriptEngine + // in Java 8, we can get rid of these and revert the subclasses of this class to abstract. + protected Object propDescCallback; + protected Object relationshipsCallback; + protected Object validateCallback; + protected Object exceptionRouteCallback; + + /** + * Create a Script without any parameters + */ + public Script() { + } + + public Script(Object... callbacks) { + for (Object callback : callbacks) { + if (callback instanceof Map<?, ?>) { + propDescCallback = propDescCallback == null && ((Map<?, ?>) callback).containsKey("getPropertyDescriptors") ? callback + : propDescCallback; + relationshipsCallback = relationshipsCallback == null && ((Map<?, ?>) callback).containsKey("getRelationships") ? callback + : relationshipsCallback; + validateCallback = validateCallback == null && ((Map<?, ?>) callback).containsKey("validate") ? callback : validateCallback; + exceptionRouteCallback = exceptionRouteCallback == null && ((Map<?, ?>) callback).containsKey("getExceptionRoute") ? callback + : exceptionRouteCallback; + } + } + } + + /** + * Specify a set of properties with corresponding NiFi validators. + * + * Subclasses that do not override this method will still have access to all + * properties via the "properties" field + * + * @return a list of PropertyDescriptors + * @throws ScriptException + * @throws NoSuchMethodException + */ + @SuppressWarnings("unchecked") + public List<PropertyDescriptor> getPropertyDescriptors() throws NoSuchMethodException, ScriptException { + if (propDescCallback != null) { + return (List<PropertyDescriptor>) ((Invocable) engine).invokeMethod(propDescCallback, "getPropertyDescriptors", (Object) null); + } + return Collections.emptyList(); + } + + /** + * Specify a set of reasons why this processor should be invalid. + * + * Subclasses that do not override this method will depend only on + * individual property validators as specified in + * {@link #getPropertyDescriptors()}. + * + * @return a Collection of messages to display to the user, or an empty + * Collection if the processor configuration is OK. + * @throws ScriptException + * @throws NoSuchMethodException + */ + @SuppressWarnings("unchecked") + public Collection<String> validate() throws NoSuchMethodException, ScriptException { + if (validateCallback != null) { + return (Collection<String>) ((Invocable) engine).invokeMethod(validateCallback, "validate", (Object) null); + } + return Collections.emptyList(); + } + + void setFlowFile(FlowFile ff) { + flowFile = ff; + if (null != ff) { + // have to clone because ff.getAttributes is unmodifiable + this.attributes = new HashMap<>(ff.getAttributes()); + this.flowFileSize = ff.getSize(); + this.flowFileEntryDate = ff.getEntryDate(); + } + } + + void setProperties(Map<String, String> map) { + properties = new HashMap<>(map); + } + + /** + * Required to access entire properties map -- Jython (at least) won't let + * you read the member variable without a getter + * + * @return entire parameter map + */ + // change back to protected when we get nashorn + public Map<String, String> getProperties() { + return properties; + } + + /** + * Get the named parameter. Some scripting languages make a method call + * easier than accessing a member field, so this is a convenience method to + * look up values in the properties field. + * + * @param key a hash key + * @return the value pointed at by the key specified + */ + public String getProperty(String key) { + return properties.get(key); + } + + /** + * Name the various relationships by which a file can leave this processor. + * Subclasses may override this method to change available relationships. + * + * @return a collection of relationship names + * @throws ScriptException + * @throws NoSuchMethodException + */ + @SuppressWarnings("unchecked") + public Collection<Relationship> getRelationships() throws NoSuchMethodException, ScriptException { + if (relationshipsCallback != null) { + return (Collection<Relationship>) ((Invocable) engine).invokeMethod(relationshipsCallback, "getRelationships", (Object) null); + } + return RELATIONSHIPS; + } + + /** + * Determine what do with a file that has just been processed. + * + * After a script runs its "read" or "write" method, it should update the + * "lastRoute" field to specify the relationship to which the resulting file + * will be sent. + * + * @return a relationship name + */ + public Relationship getRoute() { + return lastRoute; + } + + // Required because of a potential issue in Rhino -- protected methods are visible in + // subclasses but protected fields (like "lastRoute") are not + // change back to protected when we get nashorn + public void setRoute(Relationship route) { + lastRoute = route; + } + + /** + * Determine where to send a file if an exception is thrown during + * processing. + * + * Subclasses may override this method to use a different relationship, or + * to determine the relationship dynamically. Returning null causes the file + * to be deleted instead. + * + * Defaults to "failure". + * + * @return the name of the relationship to use in event of an exception, or + * null to delete the file. + * @throws ScriptException + * @throws NoSuchMethodException + */ + public Relationship getExceptionRoute() throws NoSuchMethodException, ScriptException { + if (exceptionRouteCallback != null) { + return (Relationship) ((Invocable) engine).invokeMethod(exceptionRouteCallback, "getExceptionRoute", (Object) null); + } + return FAIL_RELATIONSHIP; + } + + /* + * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get + * the incoming flow file size. + */ + // Change back to protected when we get nashorn + public long getFlowFileSize() { + return flowFileSize; + } + + /* + * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get + * entry date of the flow file. + */ + // Change back to protected when we get nashorn + public long getFlowFileEntryDate() { + return flowFileEntryDate; + } + + void setLogger(ProcessorLog logger) { + this.logger = logger; + } + + /* + * Required so that scripts in some languages can read access the attribute. Jython (at least) won't let you read the member + * variable without a getter + */ + protected ProcessorLog getLogger() { + return this.logger; + } + + void setFileName(String scriptFileName) { + this.scriptFileName = scriptFileName; + } + + public String getFileName() { + return this.scriptFileName; + } + + // this one's public because it's needed by ExecuteScript to update the flow file's attributes AFTER processing is done + public Map<String, String> getAttributes() { + return this.attributes; + } + + /* + * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to look + * up values in the attributes field. + */ + // Change back to protected when we get nashorn + public String getAttribute(String key) { + return this.attributes.get(key); + } + + /* + * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to set + * key/value pairs in the attributes field. + */ + // Change back to protected when we get nashorn + public void setAttribute(String key, String value) { + this.attributes.put(key, value); + } + + void setEngine(ScriptEngine scriptEngine) { + this.engine = scriptEngine; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java new file mode 100644 index 0000000..6f38886 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.File; +import java.util.concurrent.ConcurrentHashMap; + +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; + +import org.apache.commons.lang3.StringUtils; +import org.jruby.embed.PropertyName; + +public class ScriptEngineFactory { + + private static final String THREADING = "THREADING"; + private static final String MULTITHREADED = "MULTITHREADED"; + private static final String STATELESS = "STATELESS"; + private static final String THREAD_ISOLATED = "THREAD-ISOLATED"; + final static ScriptEngineManager scriptEngMgr; + + static { + System.setProperty(PropertyName.LOCALCONTEXT_SCOPE.toString(), "singlethread"); + System.setProperty(PropertyName.COMPILEMODE.toString(), "jit"); + System.setProperty(PropertyName.COMPATVERSION.toString(), "JRuby1.9"); + System.setProperty(PropertyName.LOCALVARIABLE_BEHAVIOR.toString(), "transient"); + System.setProperty("compile.invokedynamic", "false"); + System.setProperty(PropertyName.LAZINESS.toString(), "true"); + scriptEngMgr = new ScriptEngineManager(); + } + final ConcurrentHashMap<String, ScriptEngine> threadSafeEngines = new ConcurrentHashMap<>(); + + ScriptEngine getEngine(String extension) { + ScriptEngine engine = threadSafeEngines.get(extension); + if (null == engine) { + engine = scriptEngMgr.getEngineByExtension(extension); + if (null == engine) { + throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension); + } + + Object threading = engine.getFactory().getParameter(THREADING); + // the MULTITHREADED status means that the scripts need to be careful about sharing state + if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) { + ScriptEngine cachedEngine = threadSafeEngines.putIfAbsent(extension, engine); + if (null != cachedEngine) { + engine = cachedEngine; + } + } + } + return engine; + } + + ScriptEngine getNewEngine(File scriptFile, String extension) throws ScriptException { + ScriptEngine engine = scriptEngMgr.getEngineByExtension(extension); + if (null == engine) { + throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension); + } + // Initialize some paths + StringBuilder sb = new StringBuilder(); + switch (extension) { + case "rb": + String parent = scriptFile.getParent(); + parent = StringUtils.replace(parent, "\\", "/"); + sb.append("$:.unshift '") + .append(parent) + .append("'\n") + .append("$:.unshift File.join '") + .append(parent) + .append("', 'lib'\n"); + engine.eval(sb.toString()); + + break; + case "py": + parent = scriptFile.getParent(); + parent = StringUtils.replace(parent, "\\", "/"); + String lib = parent + "/lib"; + sb.append("import sys\n").append("sys.path.append('").append(parent) + .append("')\n").append("sys.path.append('") + .append(lib) + .append("')\n") + .append("__file__ = '") + .append(scriptFile.getAbsolutePath()) + .append("'\n"); + engine.eval(sb.toString()); + break; + default: + break; + } + + Object threading = engine.getFactory().getParameter(THREADING); + // the MULTITHREADED status means that the scripts need to be careful about sharing state + if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) { + // replace prior instance if any + threadSafeEngines.put(extension, engine); + } + return engine; + } + + boolean isThreadSafe(String scriptExtension) { + return threadSafeEngines.containsKey(scriptExtension); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java new file mode 100644 index 0000000..da18606 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.DigestInputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import javax.script.Bindings; +import javax.script.Compilable; +import javax.script.CompiledScript; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import javax.script.SimpleBindings; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.io.BufferedInputStream; +import org.apache.nifi.logging.ProcessorLog; + +import org.apache.commons.io.FileUtils; + +/** + * While this is a 'factory', it is not a singleton because we want a factory + * per processor. This factory has state, all of which belong to only one + * processor. + * + */ +public class ScriptFactory { + + private final ScriptEngineFactory engineFactory = new ScriptEngineFactory(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReadLock readLock = lock.readLock(); + private final WriteLock writeLock = lock.writeLock(); + private final ProcessorLog logger; + + private volatile CompiledScript compiledScript; + private volatile String scriptText; + private volatile byte[] md5Hash; + private volatile long lastTimeChecked; + private volatile String scriptFileName; + private volatile long scriptCheckIntervalMS = 15000; + + public ScriptFactory(ProcessorLog logger) { + this.logger = logger; + } + + public void setScriptCheckIntervalMS(long msecs) { + this.scriptCheckIntervalMS = msecs; + } + + /** + * @param aScriptFileName + * @param properties + * @param flowFile + * @return + * @throws IOException + * @throws ScriptException + */ + public Script getScript(final String aScriptFileName, final Map<String, String> properties, final FlowFile flowFile) + throws IOException, ScriptException { + final Script instance; + long now = System.currentTimeMillis(); + readLock.lock(); + try { + if (!aScriptFileName.equals(this.scriptFileName)) { + readLock.unlock(); + writeLock.lock(); + try { + if (!aScriptFileName.equals(this.scriptFileName)) { + // need to get brand new engine + compiledScript = null; + this.md5Hash = getMD5Hash(aScriptFileName); + this.lastTimeChecked = now; + this.scriptFileName = aScriptFileName; + updateEngine(); + } // else another thread beat me to the change...so just get a script + } finally { + readLock.lock(); + writeLock.unlock(); + } + } else if (lastTimeChecked + scriptCheckIntervalMS < now) { + readLock.unlock(); + writeLock.lock(); + try { + if (lastTimeChecked + scriptCheckIntervalMS < now) { + byte[] md5 = getMD5Hash(this.scriptFileName); + if (!MessageDigest.isEqual(md5Hash, md5)) { + // need to get brand new engine + compiledScript = null; + updateEngine(); + this.md5Hash = md5; + } // else no change to script, so just update time checked + this.lastTimeChecked = now; + } // else another thread beat me to the check...so just get a script + } finally { + readLock.lock(); + writeLock.unlock(); + } + } + try { + instance = getScriptInstance(properties); + instance.setFileName(this.scriptFileName); + instance.setProperties(properties); + instance.setLogger(logger); + instance.setFlowFile(flowFile); + } catch (ScriptException e) { + // need to reset state to enable re-initialization + this.lastTimeChecked = 0; + this.scriptFileName = null; + throw e; + } + } finally { + readLock.unlock(); + } + + return instance; + + } + + public Script getScript(String aScriptFileName) throws ScriptException, IOException { + Map<String, String> props = new HashMap<>(); + return getScript(aScriptFileName, props, null); + } + + private byte[] getMD5Hash(String aScriptFileName) throws FileNotFoundException, IOException { + byte[] messageDigest = null; + try (FileInputStream fis = new FileInputStream(aScriptFileName); + DigestInputStream dis = new DigestInputStream(new BufferedInputStream(fis), MessageDigest.getInstance("MD5"))) { + + byte[] bytes = new byte[8192]; + while (dis.read(bytes) != -1) { + // do nothing...just computing the md5 hash + } + messageDigest = dis.getMessageDigest().digest(); + } catch (NoSuchAlgorithmException swallow) { + // MD5 is a legitimate format + } + return messageDigest; + } + + private String getScriptText(File scriptFile, String extension) throws IOException { + final String script; + switch (extension) { + case "rb": + script = JRubyScriptFactory.INSTANCE.getScript(scriptFile); + break; + + case "js": + script = JavaScriptScriptFactory.INSTANCE.getScript(scriptFile); + break; + + case "py": + script = JythonScriptFactory.INSTANCE.getScript(scriptFile); + break; + + default: + script = FileUtils.readFileToString(scriptFile); + } + return script; + } + + private Script getScriptInstance(final Map<String, String> properties) throws ScriptException { + + Map<String, Object> localThreadVariables = new HashMap<>(); + final String extension = getExtension(scriptFileName); + String loggerVariableKey = getVariableName("GLOBAL", "logger", extension); + localThreadVariables.put(loggerVariableKey, logger); + String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension); + localThreadVariables.put(propertiesVariableKey, properties); + localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName); + final Bindings bindings = new SimpleBindings(localThreadVariables); + final ScriptEngine scriptEngine = engineFactory.getEngine(extension); + Script instance; + if (compiledScript == null) { + instance = (Script) scriptEngine.eval(scriptText, bindings); + if (instance == null) { // which it will be for python and also for local variables in javascript + instance = (Script) scriptEngine.eval("instance", bindings); + } + } else { + instance = (Script) compiledScript.eval(bindings); + if (instance == null) { // which it will be for python and also for local variables in javascript + instance = (Script) compiledScript.getEngine().eval("instance", bindings); + } + } + instance.setEngine(scriptEngine); + return instance; + } + + /* + * Must have writeLock when calling this!!!! + */ + private void updateEngine() throws IOException, ScriptException { + final String extension = getExtension(scriptFileName); + // if engine is thread safe, it's being reused...if it's a JrubyEngine it + File scriptFile = new File(this.scriptFileName); + ScriptEngine scriptEngine = engineFactory.getNewEngine(scriptFile, extension); + scriptText = getScriptText(scriptFile, extension); + Map<String, Object> localThreadVariables = new HashMap<>(); + String loggerVariableKey = getVariableName("GLOBAL", "logger", extension); + localThreadVariables.put(loggerVariableKey, logger); + String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension); + localThreadVariables.put(propertiesVariableKey, new HashMap<String, String>()); + localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName); + if (scriptEngine instanceof Compilable) { + Bindings bindings = new SimpleBindings(localThreadVariables); + scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE); + compiledScript = ((Compilable) scriptEngine).compile(scriptText); + } + logger.debug("Updating Engine!!"); + } + + private String getVariableName(String scope, String variableName, String extension) { + String result; + switch (extension) { + case "rb": + switch (scope) { + case "GLOBAL": + result = '$' + variableName; + break; + case "INSTANCE": + result = '@' + variableName; + break; + default: + result = variableName; + break; + } + + break; + + default: + result = variableName; + break; + } + return result; + } + + private String getExtension(String aScriptFileName) { + int dotPos = aScriptFileName.lastIndexOf('.'); + if (dotPos < 1) { + throw new IllegalArgumentException("Script file name must have an extension"); + } + final String extension = aScriptFileName.substring(dotPos + 1); + return extension; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java new file mode 100644 index 0000000..7eef98b --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.scripting; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +import javax.script.Invocable; +import javax.script.ScriptException; + +/** + * <p> + * Script authors should extend this class if they want to follow the + * "processCallback" paradigm for NiFi processors. + * </p> + * + * <p> + * At a minimum, scripts must implement + * <code>process(FileInputStream, FileOutputStream)</code>. + * </p> + * + * <p> + * By default, all files processed will be sent to the relationship + * <em>success</em>, unless the scriptFileName raises an exception, in which + * case the file will be sent to <em>failure</em>. Implement + * {@link #getProcessorRelationships()} and/or {@link #getRoute()} to change + * this behavior. + * </p> + * + */ +public class WriterScript extends Script { + + private Object processCallback; + + public WriterScript() { + + } + + public WriterScript(Object... callbacks) { + super(callbacks); + for (Object callback : callbacks) { + if (callback instanceof Map<?, ?>) { + processCallback = processCallback == null && ((Map<?, ?>) callback).containsKey("process") ? callback : processCallback; + } + } + } + + public void process(InputStream in, OutputStream out) throws NoSuchMethodException, ScriptException { + Invocable inv = (Invocable) engine; + inv.invokeMethod(processCallback, "process", in, out); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000..20a3982 --- /dev/null +++ b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.script.ExecuteScript