http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java deleted file mode 100644 index d879722..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/OutputStreamHandler.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.scripting; - -import java.io.OutputStream; - -public interface OutputStreamHandler { - - void write(OutputStream out); -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java deleted file mode 100644 index b1d89c0..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ReaderScript.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.scripting; - -import java.io.InputStream; -import java.util.Map; - -import javax.script.Invocable; -import javax.script.ScriptException; - -import org.apache.nifi.processor.Relationship; - -/** - * <p> - * Script authors should extend this class if they want to follow the "reader" - * paradigm for NiFi processors. - * </p> - * - * <p> - * User scripts should implement {@link #route(InputStream)}. <code>route</code> - * uses a returned relationship name to determine where FlowFiles go. Scripts - * may also implement {@link #getProcessorRelationships()} to specify available - * relationship names. - * </p> - * - */ -public class ReaderScript extends Script { - - private Object routeCallback; - - public ReaderScript(Object... callbacks) { - super(callbacks); - for (Object callback : callbacks) { - if (callback instanceof Map<?, ?>) { - routeCallback = routeCallback == null && ((Map<?, ?>) callback).containsKey("route") ? callback : routeCallback; - } - } - } - - public ReaderScript() { - - } - - // Simple helper - public void process(InputStream input) throws NoSuchMethodException, ScriptException { - lastRoute = route(input); - } - - /** - * Subclasses should examine the provided inputstream, then determine which - * relationship the file will be sent down and return its name. - * - * - * @param in a Java InputStream containing the incoming FlowFile. - * @return a relationship name - * @throws ScriptException - * @throws NoSuchMethodException - */ - public Relationship route(InputStream in) throws NoSuchMethodException, ScriptException { - Relationship relationship = null; - Invocable invocable = (Invocable) this.engine; - relationship = (Relationship) invocable.invokeMethod(routeCallback, "route", in); - return relationship; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java deleted file mode 100644 index 786f541..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/Script.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.scripting; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.script.Invocable; -import javax.script.ScriptEngine; -import javax.script.ScriptException; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.Relationship; - -/** - * <p> - * Base class for all scripts. In this framework, only ScriptEngines that - * implement javax.script.Invocable are supported. - * - * </p> - * - */ -public class Script { - - public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder() - .name("success") - .description("Destination of successfully created flow files") - .build(); - public static final Relationship FAIL_RELATIONSHIP = new Relationship.Builder() - .name("failure") - .description("Destination of flow files when a error occurs in the script") - .build(); - - static final Set<Relationship> RELATIONSHIPS; - - static { - Set<Relationship> rels = new HashSet<>(); - rels.add(FAIL_RELATIONSHIP); - rels.add(SUCCESS_RELATIONSHIP); - RELATIONSHIPS = Collections.unmodifiableSet(rels); - } - - FlowFile flowFile = null; - ScriptEngine engine = null; - - protected Map<String, String> properties = new HashMap<>(); - protected Relationship lastRoute = SUCCESS_RELATIONSHIP; - protected ProcessorLog logger; - protected String scriptFileName; - protected Map<String, String> attributes = new HashMap<>(); - protected long flowFileSize = 0; - protected long flowFileEntryDate = System.currentTimeMillis(); - - // the following are needed due to an inadequate JavaScript ScriptEngine. It will not allow - // subclassing a Java Class, only implementing a Java Interface. So, the syntax of JavaScript - // scripts looks like subclassing, but actually is just constructing a Script instance and - // passing in functions as args to the constructor. When we move to Nashorn JavaScript ScriptEngine - // in Java 8, we can get rid of these and revert the subclasses of this class to abstract. - protected Object propDescCallback; - protected Object relationshipsCallback; - protected Object validateCallback; - protected Object exceptionRouteCallback; - - /** - * Create a Script without any parameters - */ - public Script() { - } - - public Script(Object... callbacks) { - for (Object callback : callbacks) { - if (callback instanceof Map<?, ?>) { - propDescCallback = propDescCallback == null && ((Map<?, ?>) callback).containsKey("getPropertyDescriptors") ? callback - : propDescCallback; - relationshipsCallback = relationshipsCallback == null && ((Map<?, ?>) callback).containsKey("getRelationships") ? callback - : relationshipsCallback; - validateCallback = validateCallback == null && ((Map<?, ?>) callback).containsKey("validate") ? callback : validateCallback; - exceptionRouteCallback = exceptionRouteCallback == null && ((Map<?, ?>) callback).containsKey("getExceptionRoute") ? callback - : exceptionRouteCallback; - } - } - } - - /** - * Specify a set of properties with corresponding NiFi validators. - * - * Subclasses that do not override this method will still have access to all - * properties via the "properties" field - * - * @return a list of PropertyDescriptors - * @throws ScriptException - * @throws NoSuchMethodException - */ - @SuppressWarnings("unchecked") - public List<PropertyDescriptor> getPropertyDescriptors() throws NoSuchMethodException, ScriptException { - if (propDescCallback != null) { - return (List<PropertyDescriptor>) ((Invocable) engine).invokeMethod(propDescCallback, "getPropertyDescriptors", (Object) null); - } - return Collections.emptyList(); - } - - /** - * Specify a set of reasons why this processor should be invalid. - * - * Subclasses that do not override this method will depend only on - * individual property validators as specified in - * {@link #getPropertyDescriptors()}. - * - * @return a Collection of messages to display to the user, or an empty - * Collection if the processor configuration is OK. - * @throws ScriptException - * @throws NoSuchMethodException - */ - @SuppressWarnings("unchecked") - public Collection<String> validate() throws NoSuchMethodException, ScriptException { - if (validateCallback != null) { - return (Collection<String>) ((Invocable) engine).invokeMethod(validateCallback, "validate", (Object) null); - } - return Collections.emptyList(); - } - - void setFlowFile(FlowFile ff) { - flowFile = ff; - if (null != ff) { - // have to clone because ff.getAttributes is unmodifiable - this.attributes = new HashMap<>(ff.getAttributes()); - this.flowFileSize = ff.getSize(); - this.flowFileEntryDate = ff.getEntryDate(); - } - } - - void setProperties(Map<String, String> map) { - properties = new HashMap<>(map); - } - - /** - * Required to access entire properties map -- Jython (at least) won't let - * you read the member variable without a getter - * - * @return entire parameter map - */ - // change back to protected when we get nashorn - public Map<String, String> getProperties() { - return properties; - } - - /** - * Get the named parameter. Some scripting languages make a method call - * easier than accessing a member field, so this is a convenience method to - * look up values in the properties field. - * - * @param key a hash key - * @return the value pointed at by the key specified - */ - public String getProperty(String key) { - return properties.get(key); - } - - /** - * Name the various relationships by which a file can leave this processor. - * Subclasses may override this method to change available relationships. - * - * @return a collection of relationship names - * @throws ScriptException - * @throws NoSuchMethodException - */ - @SuppressWarnings("unchecked") - public Collection<Relationship> getRelationships() throws NoSuchMethodException, ScriptException { - if (relationshipsCallback != null) { - return (Collection<Relationship>) ((Invocable) engine).invokeMethod(relationshipsCallback, "getRelationships", (Object) null); - } - return RELATIONSHIPS; - } - - /** - * Determine what do with a file that has just been processed. - * - * After a script runs its "read" or "write" method, it should update the - * "lastRoute" field to specify the relationship to which the resulting file - * will be sent. - * - * @return a relationship name - */ - public Relationship getRoute() { - return lastRoute; - } - - // Required because of a potential issue in Rhino -- protected methods are visible in - // subclasses but protected fields (like "lastRoute") are not - // change back to protected when we get nashorn - public void setRoute(Relationship route) { - lastRoute = route; - } - - /** - * Determine where to send a file if an exception is thrown during - * processing. - * - * Subclasses may override this method to use a different relationship, or - * to determine the relationship dynamically. Returning null causes the file - * to be deleted instead. - * - * Defaults to "failure". - * - * @return the name of the relationship to use in event of an exception, or - * null to delete the file. - * @throws ScriptException - * @throws NoSuchMethodException - */ - public Relationship getExceptionRoute() throws NoSuchMethodException, ScriptException { - if (exceptionRouteCallback != null) { - return (Relationship) ((Invocable) engine).invokeMethod(exceptionRouteCallback, "getExceptionRoute", (Object) null); - } - return FAIL_RELATIONSHIP; - } - - /* - * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get - * the incoming flow file size. - */ - // Change back to protected when we get nashorn - public long getFlowFileSize() { - return flowFileSize; - } - - /* - * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to get - * entry date of the flow file. - */ - // Change back to protected when we get nashorn - public long getFlowFileEntryDate() { - return flowFileEntryDate; - } - - void setLogger(ProcessorLog logger) { - this.logger = logger; - } - - /* - * Required so that scripts in some languages can read access the attribute. Jython (at least) won't let you read the member - * variable without a getter - */ - protected ProcessorLog getLogger() { - return this.logger; - } - - void setFileName(String scriptFileName) { - this.scriptFileName = scriptFileName; - } - - public String getFileName() { - return this.scriptFileName; - } - - // this one's public because it's needed by ExecuteScript to update the flow file's attributes AFTER processing is done - public Map<String, String> getAttributes() { - return this.attributes; - } - - /* - * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to look - * up values in the attributes field. - */ - // Change back to protected when we get nashorn - public String getAttribute(String key) { - return this.attributes.get(key); - } - - /* - * Some scripting languages make a method call easier than accessing a member field, so this is a convenience method to set - * key/value pairs in the attributes field. - */ - // Change back to protected when we get nashorn - public void setAttribute(String key, String value) { - this.attributes.put(key, value); - } - - void setEngine(ScriptEngine scriptEngine) { - this.engine = scriptEngine; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java deleted file mode 100644 index 6f38886..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptEngineFactory.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.scripting; - -import java.io.File; -import java.util.concurrent.ConcurrentHashMap; - -import javax.script.ScriptEngine; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; - -import org.apache.commons.lang3.StringUtils; -import org.jruby.embed.PropertyName; - -public class ScriptEngineFactory { - - private static final String THREADING = "THREADING"; - private static final String MULTITHREADED = "MULTITHREADED"; - private static final String STATELESS = "STATELESS"; - private static final String THREAD_ISOLATED = "THREAD-ISOLATED"; - final static ScriptEngineManager scriptEngMgr; - - static { - System.setProperty(PropertyName.LOCALCONTEXT_SCOPE.toString(), "singlethread"); - System.setProperty(PropertyName.COMPILEMODE.toString(), "jit"); - System.setProperty(PropertyName.COMPATVERSION.toString(), "JRuby1.9"); - System.setProperty(PropertyName.LOCALVARIABLE_BEHAVIOR.toString(), "transient"); - System.setProperty("compile.invokedynamic", "false"); - System.setProperty(PropertyName.LAZINESS.toString(), "true"); - scriptEngMgr = new ScriptEngineManager(); - } - final ConcurrentHashMap<String, ScriptEngine> threadSafeEngines = new ConcurrentHashMap<>(); - - ScriptEngine getEngine(String extension) { - ScriptEngine engine = threadSafeEngines.get(extension); - if (null == engine) { - engine = scriptEngMgr.getEngineByExtension(extension); - if (null == engine) { - throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension); - } - - Object threading = engine.getFactory().getParameter(THREADING); - // the MULTITHREADED status means that the scripts need to be careful about sharing state - if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) { - ScriptEngine cachedEngine = threadSafeEngines.putIfAbsent(extension, engine); - if (null != cachedEngine) { - engine = cachedEngine; - } - } - } - return engine; - } - - ScriptEngine getNewEngine(File scriptFile, String extension) throws ScriptException { - ScriptEngine engine = scriptEngMgr.getEngineByExtension(extension); - if (null == engine) { - throw new IllegalArgumentException("No ScriptEngine exists for extension " + extension); - } - // Initialize some paths - StringBuilder sb = new StringBuilder(); - switch (extension) { - case "rb": - String parent = scriptFile.getParent(); - parent = StringUtils.replace(parent, "\\", "/"); - sb.append("$:.unshift '") - .append(parent) - .append("'\n") - .append("$:.unshift File.join '") - .append(parent) - .append("', 'lib'\n"); - engine.eval(sb.toString()); - - break; - case "py": - parent = scriptFile.getParent(); - parent = StringUtils.replace(parent, "\\", "/"); - String lib = parent + "/lib"; - sb.append("import sys\n").append("sys.path.append('").append(parent) - .append("')\n").append("sys.path.append('") - .append(lib) - .append("')\n") - .append("__file__ = '") - .append(scriptFile.getAbsolutePath()) - .append("'\n"); - engine.eval(sb.toString()); - break; - default: - break; - } - - Object threading = engine.getFactory().getParameter(THREADING); - // the MULTITHREADED status means that the scripts need to be careful about sharing state - if (THREAD_ISOLATED.equals(threading) || STATELESS.equals(threading) || MULTITHREADED.equals(threading)) { - // replace prior instance if any - threadSafeEngines.put(extension, engine); - } - return engine; - } - - boolean isThreadSafe(String scriptExtension) { - return threadSafeEngines.containsKey(scriptExtension); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java deleted file mode 100644 index da18606..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/ScriptFactory.java +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.scripting; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.security.DigestInputStream; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - -import javax.script.Bindings; -import javax.script.Compilable; -import javax.script.CompiledScript; -import javax.script.ScriptContext; -import javax.script.ScriptEngine; -import javax.script.ScriptException; -import javax.script.SimpleBindings; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.io.BufferedInputStream; -import org.apache.nifi.logging.ProcessorLog; - -import org.apache.commons.io.FileUtils; - -/** - * While this is a 'factory', it is not a singleton because we want a factory - * per processor. This factory has state, all of which belong to only one - * processor. - * - */ -public class ScriptFactory { - - private final ScriptEngineFactory engineFactory = new ScriptEngineFactory(); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final ReadLock readLock = lock.readLock(); - private final WriteLock writeLock = lock.writeLock(); - private final ProcessorLog logger; - - private volatile CompiledScript compiledScript; - private volatile String scriptText; - private volatile byte[] md5Hash; - private volatile long lastTimeChecked; - private volatile String scriptFileName; - private volatile long scriptCheckIntervalMS = 15000; - - public ScriptFactory(ProcessorLog logger) { - this.logger = logger; - } - - public void setScriptCheckIntervalMS(long msecs) { - this.scriptCheckIntervalMS = msecs; - } - - /** - * @param aScriptFileName - * @param properties - * @param flowFile - * @return - * @throws IOException - * @throws ScriptException - */ - public Script getScript(final String aScriptFileName, final Map<String, String> properties, final FlowFile flowFile) - throws IOException, ScriptException { - final Script instance; - long now = System.currentTimeMillis(); - readLock.lock(); - try { - if (!aScriptFileName.equals(this.scriptFileName)) { - readLock.unlock(); - writeLock.lock(); - try { - if (!aScriptFileName.equals(this.scriptFileName)) { - // need to get brand new engine - compiledScript = null; - this.md5Hash = getMD5Hash(aScriptFileName); - this.lastTimeChecked = now; - this.scriptFileName = aScriptFileName; - updateEngine(); - } // else another thread beat me to the change...so just get a script - } finally { - readLock.lock(); - writeLock.unlock(); - } - } else if (lastTimeChecked + scriptCheckIntervalMS < now) { - readLock.unlock(); - writeLock.lock(); - try { - if (lastTimeChecked + scriptCheckIntervalMS < now) { - byte[] md5 = getMD5Hash(this.scriptFileName); - if (!MessageDigest.isEqual(md5Hash, md5)) { - // need to get brand new engine - compiledScript = null; - updateEngine(); - this.md5Hash = md5; - } // else no change to script, so just update time checked - this.lastTimeChecked = now; - } // else another thread beat me to the check...so just get a script - } finally { - readLock.lock(); - writeLock.unlock(); - } - } - try { - instance = getScriptInstance(properties); - instance.setFileName(this.scriptFileName); - instance.setProperties(properties); - instance.setLogger(logger); - instance.setFlowFile(flowFile); - } catch (ScriptException e) { - // need to reset state to enable re-initialization - this.lastTimeChecked = 0; - this.scriptFileName = null; - throw e; - } - } finally { - readLock.unlock(); - } - - return instance; - - } - - public Script getScript(String aScriptFileName) throws ScriptException, IOException { - Map<String, String> props = new HashMap<>(); - return getScript(aScriptFileName, props, null); - } - - private byte[] getMD5Hash(String aScriptFileName) throws FileNotFoundException, IOException { - byte[] messageDigest = null; - try (FileInputStream fis = new FileInputStream(aScriptFileName); - DigestInputStream dis = new DigestInputStream(new BufferedInputStream(fis), MessageDigest.getInstance("MD5"))) { - - byte[] bytes = new byte[8192]; - while (dis.read(bytes) != -1) { - // do nothing...just computing the md5 hash - } - messageDigest = dis.getMessageDigest().digest(); - } catch (NoSuchAlgorithmException swallow) { - // MD5 is a legitimate format - } - return messageDigest; - } - - private String getScriptText(File scriptFile, String extension) throws IOException { - final String script; - switch (extension) { - case "rb": - script = JRubyScriptFactory.INSTANCE.getScript(scriptFile); - break; - - case "js": - script = JavaScriptScriptFactory.INSTANCE.getScript(scriptFile); - break; - - case "py": - script = JythonScriptFactory.INSTANCE.getScript(scriptFile); - break; - - default: - script = FileUtils.readFileToString(scriptFile); - } - return script; - } - - private Script getScriptInstance(final Map<String, String> properties) throws ScriptException { - - Map<String, Object> localThreadVariables = new HashMap<>(); - final String extension = getExtension(scriptFileName); - String loggerVariableKey = getVariableName("GLOBAL", "logger", extension); - localThreadVariables.put(loggerVariableKey, logger); - String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension); - localThreadVariables.put(propertiesVariableKey, properties); - localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName); - final Bindings bindings = new SimpleBindings(localThreadVariables); - final ScriptEngine scriptEngine = engineFactory.getEngine(extension); - Script instance; - if (compiledScript == null) { - instance = (Script) scriptEngine.eval(scriptText, bindings); - if (instance == null) { // which it will be for python and also for local variables in javascript - instance = (Script) scriptEngine.eval("instance", bindings); - } - } else { - instance = (Script) compiledScript.eval(bindings); - if (instance == null) { // which it will be for python and also for local variables in javascript - instance = (Script) compiledScript.getEngine().eval("instance", bindings); - } - } - instance.setEngine(scriptEngine); - return instance; - } - - /* - * Must have writeLock when calling this!!!! - */ - private void updateEngine() throws IOException, ScriptException { - final String extension = getExtension(scriptFileName); - // if engine is thread safe, it's being reused...if it's a JrubyEngine it - File scriptFile = new File(this.scriptFileName); - ScriptEngine scriptEngine = engineFactory.getNewEngine(scriptFile, extension); - scriptText = getScriptText(scriptFile, extension); - Map<String, Object> localThreadVariables = new HashMap<>(); - String loggerVariableKey = getVariableName("GLOBAL", "logger", extension); - localThreadVariables.put(loggerVariableKey, logger); - String propertiesVariableKey = getVariableName("INSTANCE", "properties", extension); - localThreadVariables.put(propertiesVariableKey, new HashMap<String, String>()); - localThreadVariables.put(ScriptEngine.FILENAME, scriptFileName); - if (scriptEngine instanceof Compilable) { - Bindings bindings = new SimpleBindings(localThreadVariables); - scriptEngine.setBindings(bindings, ScriptContext.ENGINE_SCOPE); - compiledScript = ((Compilable) scriptEngine).compile(scriptText); - } - logger.debug("Updating Engine!!"); - } - - private String getVariableName(String scope, String variableName, String extension) { - String result; - switch (extension) { - case "rb": - switch (scope) { - case "GLOBAL": - result = '$' + variableName; - break; - case "INSTANCE": - result = '@' + variableName; - break; - default: - result = variableName; - break; - } - - break; - - default: - result = variableName; - break; - } - return result; - } - - private String getExtension(String aScriptFileName) { - int dotPos = aScriptFileName.lastIndexOf('.'); - if (dotPos < 1) { - throw new IllegalArgumentException("Script file name must have an extension"); - } - final String extension = aScriptFileName.substring(dotPos + 1); - return extension; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java deleted file mode 100644 index 7eef98b..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/java/org/apache/nifi/scripting/WriterScript.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.scripting; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Map; - -import javax.script.Invocable; -import javax.script.ScriptException; - -/** - * <p> - * Script authors should extend this class if they want to follow the - * "processCallback" paradigm for NiFi processors. - * </p> - * - * <p> - * At a minimum, scripts must implement - * <code>process(FileInputStream, FileOutputStream)</code>. - * </p> - * - * <p> - * By default, all files processed will be sent to the relationship - * <em>success</em>, unless the scriptFileName raises an exception, in which - * case the file will be sent to <em>failure</em>. Implement - * {@link #getProcessorRelationships()} and/or {@link #getRoute()} to change - * this behavior. - * </p> - * - */ -public class WriterScript extends Script { - - private Object processCallback; - - public WriterScript() { - - } - - public WriterScript(Object... callbacks) { - super(callbacks); - for (Object callback : callbacks) { - if (callback instanceof Map<?, ?>) { - processCallback = processCallback == null && ((Map<?, ?>) callback).containsKey("process") ? callback : processCallback; - } - } - } - - public void process(InputStream in, OutputStream out) throws NoSuchMethodException, ScriptException { - Invocable inv = (Invocable) engine; - inv.invokeMethod(processCallback, "process", in, out); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor deleted file mode 100644 index 20a3982..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -org.apache.nifi.processors.script.ExecuteScript http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html ---------------------------------------------------------------------- diff --git a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html b/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html deleted file mode 100644 index acb47c5..0000000 --- a/nar-bundles/execute-script-bundle/execute-script-processors/src/main/resources/docs/org.apache.nifi.processors.script.ExecuteScript/index.html +++ /dev/null @@ -1,264 +0,0 @@ -<!DOCTYPE html> -<html lang="en"> - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - <head> - <meta charset="utf-8" /> - <title>ExecuteScript</title> - - <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> - </head> - - <body> - <!-- Processor Documentation ================================================== --> - <h2>Description:</h2> - <p> - This processor provides the capability to execute scripts in various scripting languages, and passes into the scripts - the input stream and output stream(s) representing an incoming flow file and any created flow files. The processor is designed to be - thread safe, so multiple concurrent tasks may execute against a single script. The processor provides a framework which enables - script writers to implement 3 different types of scripts: - <ul> - ReaderScript - which enables stream-based reading of a FlowFile's content</br> - WriterScript - which enables stream-based reading and writing/modifying of a FlowFile's content</br> - ConverterScript - which enables stream-based reading a FlowFile's content and stream-based writing to newly created FlowFiles</br> - </ul> - Presently, the processor supports 3 scripting languages: Ruby, Python, and JavaScript. The processor is built on the - javax.script API which enables ScriptEngine discovery, thread management, and encapsulates much of the low level bridging-code that - enables Java to Script language integration. Thus, it is designed to be easily extended to other scripting languages. </br> - The attributes of a FlowFile and properties of the Processor are exposed to the script by either a variable in the base class or - a getter method. A script may declare new Processor Properties and different Relationships via overriding the getPropertyDescriptors - and getRelationships methods, respectively. - </p> - The processor provides some boilerplate script to aid in the creation of the three different types of scripts. For example, - the processor provides import statements for classes commonly used within a processor. - <pre> - 'org.apache.nifi.components.PropertyDescriptor' - 'org.apache.nifi.components.Validator' - 'org.apache.nifi.processor.util.StandardValidators' - 'org.apache.nifi.processor.Relationship' - 'org.apache.nifi.logging.ProcessorLog' - 'org.apache.nifi.scripting.ReaderScript' - 'org.apache.nifi.scripting.WriterScript' - 'org.apache.nifi.scripting.ConverterScript' - </pre> - The processor appends to the script's execution path the parent directory of the specified script file and a sub-directory - called 'lib', which may be useful for supporting scripts. </p> -<p> - <strong>Shared Variables</strong> -</p> -The following variables are provided as shared variables for the scripts: -<ul> - <li>logger - <ul> - <li> The processor's logger </li> - <li> Scope is GLOBAL, thus in Ruby the syntax is $logger</li> - </ul> - </li> - <li>properties - <ul> - <li> A Map of the processor's configuration properties; key and value are strings</li> - <li> Scope is INSTANCE, thus in Ruby the syntax is @properties</li> - </ul> - </li> -</ul> -<p> - <strong>Properties:</strong> -</p> -<p> - In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered - optional. If a property has a default value, it is indicated. If a property supports the use of the NiFi Expression Language - (or simply, "expression language"), that is also indicated. Of particular note: This processor allows scripts to define additional - Processor properties, which will not be initially visible. Once the processor's configuration is validated, script defined properties - will become visible, and may affect the validity of the processor. -</p> -<ul> - <li> - <strong>Script File Name</strong> - <ul> - <li>Script location, can be relative or absolute path.</li> - <li>Default value: no default</li> - <li>Supports expression language: false</li> - </ul> - </li> - <li> - <strong>Script Check Interval</strong> - <ul> - <li>The time period between checking for updates to a script.</li> - <li>Default value: 15 sec</li> - <li>Supports expression language: false</li> - </ul> - </li> -</ul> - -<p> - <strong>Relationships:</strong> -</p> -<p> - The initial 'out of the box' relationships are below. Of particular note is the ability of a script to change the set of - relationships. However, any relationships defined by the script will not be visible until the processor's configuration has been - validated. Once done, new relationships will become visible. -</p> -<ul> - <li> - success - <ul> - <li>Used when a file is successfully processed by a script.</li> - </ul> - </li> - <li> - failure - <ul> - <li>Used when an error occurs while processing a file with a script.</li> - </ul> - </li> -</ul> - -<p> - <strong>Example Scripts:</strong> -</p> -<ul> - JavaScript example - the 'with' statement imports packages defined in the framework and limits the importing to the local scope, - rather than global. The 'Scripting' variable uses the JavaImporter class within JavaScript. Since the 'instance' variable is intended to - be local scope (not global), it must be named 'instance' as it it not passed back to the processor upon script evaluation and must be - fetched. If you make it global, you can name it whatever you'd like...but this is intended to be multi-threaded so do so at your own - risk.</p> -Presently, there are issues with the JavaScript scripting engine that prevent sub-classing the base classes in the Processor's Java -framework. So, what is actually happening is an instance of the ReaderScript is created with a provided callback object. When we are able -to move to a more competent scripting engine (supposedly in Java 8), the code below will remain the same, but the 'instance' variable -will actually be a sub-class of ReaderScript. -<pre> - with (Scripting) { - var instance = new ReaderScript({ - route : function(input) { - var str = IOUtils.toString(input); - var expr = instance.getProperty("expr"); - filename = instance.attributes.get("filename"); - instance.setAttribute("filename", filename + ".modified"); - if (str.match(expr)) { - return Script.FAIL_RELATIONSHIP; - } else { - return Script.SUCCESS_RELATIONSHIP; - } - } - }); - } -</pre> -Ruby example - the 'OutputStreamHandler' is an interface which is called when creating flow files. -<pre> - java_import 'org.apache.nifi.scripting.OutputStreamHandler' - class SimpleConverter < ConverterScript - field_reader :FAIL_RELATIONSHIP, :SUCCESS_RELATIONSHIP, :logger, :attributes - - def convert(input) - in_io = input.to_io - createFlowFile("firstLine", FAIL_RELATIONSHIP, OutputStreamHandler.impl do |method, out| - out_io = out.to_io - out_io << in_io.readline.to_java_bytes - out_io.close - logger.debug("Wrote data to failure...this message logged with logger from super class") - end) - - createFlowFile("otherLines", SUCCESS_RELATIONSHIP, OutputStreamHandler.impl do |method, out| - out_io = out.to_io - in_io.each_line { |line| - out_io << line - } - out_io.close - logger.debug("Wrote data to success...this message logged with logger from super class") - end) - in_io.close - end - - end - - $logger.debug("Creating SimpleConverter...this message logged with logger from shared variables") - SimpleConverter.new -</pre> -Python example - The difficulty with Python is that it does not return objects upon script evaluation, so the instance of the Script -class must be fetched by name. Thus, you must define a variable called 'instance'. -<pre> - import re - - class RoutingReader(ReaderScript): - A = Relationship.Builder().name("a").description("some good stuff").build() - B = Relationship.Builder().name("b").description("some other stuff").build() - C = Relationship.Builder().name("c").description("some bad stuff").build() - - def getRelationships(self): - return [self.A,self.B,self.C] - - def getExceptionRoute(self): - return self.C - - def route( self, input ): - logger.info("Executing route") - for line in FileUtil.wrap(input): - if re.match("^bad", line, re.IGNORECASE): - return self.B - if re.match("^sed", line): - raise RuntimeError("That's no good!") - - return self.A - logger.debug("Constructing instance") - instance = RoutingReader() - -</pre> -</ul> -<p> - <strong>Script API:</strong> -</p> -<ul> - <li>getAttribute(String) : String</li> - <li>getAttributes() : Map(String,String)</li> - <li>getExceptionRoute() : Relationship</li> - <li>getFileName() : String</li> - <li>getFlowFileEntryDate() : Calendar</li> - <li>getFlowFileSize() : long</li> - <li>getProperties() : Map(String, String)</li> - <li>getProperty(String) : String</li> - <li>getPropertyDescriptors() : List(PropertyDescriptor)</li> - <li>getRelationships() : Collection(Relationship)</li> - <li>getRoute() : Relationship</li> - <li>setRoute(Relationship)</li> - <li>setAttribute(String, String)</li> - <li>validate() : Collection(String)</li> -</ul> -<p> - <strong>ReaderScript API:</strong> -</p> -<ul> - <li>route(InputStream) : Relationship</li> -</ul> -<p> - <strong>WriterScript API:</strong> -</p> -<ul> - <li>process(InputStream, OutputStream)</li> -</ul> -<p> - <strong>ConverterScript API:</strong> -</p> -<ul> - <li>convert(InputStream)</li> - <li>createFlowFile(String, Relationship, OutputStreamHandler)</li> -</ul> -<p> - <strong>OutputStreamHandler API:</strong> -</p> -<ul> - <li>write(OutputStream)</li> -</ul> -</body> -</html>