This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 07fd64d MINIFICPP-1706 Rework engine queue for ExecuteScript
07fd64d is described below
commit 07fd64dfe83f23437f2ed2597740ae371e0c25ad
Author: Gabor Gyimesi <[email protected]>
AuthorDate: Wed Jan 19 16:51:15 2022 +0100
MINIFICPP-1706 Rework engine queue for ExecuteScript
Signed-off-by: Adam Debreceni <[email protected]>
This closes #1232
---
.github/workflows/ci.yml | 2 +-
PROCESSORS.md | 2 +-
cmake/DockerConfig.cmake | 2 +
docker/Dockerfile | 16 +--
.../integration/MiNiFi_integration_test_driver.py | 12 +--
docker/test/integration/features/script.feature | 25 +++++
.../minifi/core/DockerTestDirectoryBindings.py | 1 +
docker/test/integration/minifi/core/ImageStore.py | 1 +
docker/test/integration/minifi/core/Processor.py | 10 +-
.../Minifi_flow_yaml_serializer.py | 3 +-
.../flow_serialization/Nifi_flow_xml_serializer.py | 2 +-
.../integration/minifi/processors/ExecuteScript.py | 8 ++
.../integration/resources/lua/sleep_forever.lua | 12 +++
.../integration/resources/python/sleep_forever.py | 7 ++
docker/test/integration/steps/steps.py | 12 +++
extensions/script/CMakeLists.txt | 5 +-
extensions/script/ExecuteScript.cpp | 109 ++++++++++---------
extensions/script/ExecuteScript.h | 118 ++++++++++++++++++---
extensions/script/tests/CMakeLists.txt | 1 -
extensions/script/tests/LuaScriptEngineTests.cpp | 10 +-
.../TestExecuteScriptProcessorWithLuaScript.cpp | 52 +++++++--
.../TestExecuteScriptProcessorWithPythonScript.cpp | 49 ++++++++-
22 files changed, 353 insertions(+), 106 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 63086ea..23c1c54 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -202,7 +202,7 @@ jobs:
if [ -d ~/.ccache ]; then mv ~/.ccache .; fi
mkdir build
cd build
- cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF
-DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON
-DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON
-DDOCKER_BUILD_ONLY=ON -DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
+ cmake -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DENABLE_JNI=OFF
-DDISABLE_JEMALLOC=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON
-DENABLE_AZURE=ON -DENABLE_SQL=ON -DENABLE_SPLUNK=ON -DENABLE_OPC=ON
-DENABLE_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DDOCKER_BUILD_ONLY=ON
-DDOCKER_CCACHE_DUMP_LOCATION=$HOME/.ccache ..
make docker
- id: install_deps
run: |
diff --git a/PROCESSORS.md b/PROCESSORS.md
index 3446c98..fe56f16 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -471,7 +471,7 @@ In the list below, the names of required properties appear
in bold. Any other pr
|----------------------|---------------|------------------|--------------------------------------------------------------------------------------------------------|
| Module Directory | | | Comma-separated
list of paths to files and/or directories which contain modules required by the
script |
| Script Body | | | Body of script to
execute. Only one of Script File or Script Body may be used
|
-| Script Engine | python | python<br>lua | The engine to
execute scripts (python, lua)
|
+| **Script Engine** | python | python<br>lua | The engine to
execute scripts (python, lua)
|
| Script File | | | Path to script
file to execute. Only one of Script File or Script Body may be used
|
### Relationships
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 4c10ccf..6b72df0 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -45,6 +45,8 @@ add_custom_target(
-c ENABLE_ENCRYPT_CONFIG=${ENABLE_ENCRYPT_CONFIG}
-c ENABLE_NANOFI=${ENABLE_NANOFI}
-c ENABLE_SPLUNK=${ENABLE_SPLUNK}
+ -c ENABLE_SCRIPTING=${ENABLE_SCRIPTING}
+ -c ENABLE_LUA_SCRIPTING=${ENABLE_LUA_SCRIPTING}
-c DISABLE_CURL=${DISABLE_CURL}
-c DISABLE_JEMALLOC=${DISABLE_JEMALLOC}
-c DISABLE_CIVET=${DISABLE_CIVET}
diff --git a/docker/Dockerfile b/docker/Dockerfile
index cca67f4..9a8f2e1 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -59,8 +59,9 @@ ARG DISABLE_ROCKSDB=OFF
ARG DISABLE_LIBARCHIVE=OFF
ARG DISABLE_LZMA=OFF
ARG DISABLE_BZIP2=OFF
-ARG ENABLE_SCRIPTING=ON
+ARG ENABLE_SCRIPTING=OFF
ARG DISABLE_PYTHON_SCRIPTING=
+ARG ENABLE_LUA_SCRIPTING=
ARG DISABLE_CONTROLLER=OFF
ARG CMAKE_BUILD_TYPE=Release
@@ -89,7 +90,8 @@ RUN apk --no-cache add gcc \
python3-dev \
boost-dev \
doxygen \
- ccache
+ ccache \
+ lua-dev
ENV USER minificpp
ENV MINIFI_BASE_DIR /opt/minifi
@@ -118,8 +120,8 @@ RUN cmake -DSTATIC_BUILD= -DSKIP_TESTS=true
-DENABLE_ALL="${ENABLE_ALL}" -DENABL
-DDISABLE_CURL="${DISABLE_CURL}" -DDISABLE_JEMALLOC="${DISABLE_JEMALLOC}"
-DDISABLE_CIVET="${DISABLE_CIVET}" -DENABLE_SPLUNK=${ENABLE_SPLUNK} \
-DDISABLE_EXPRESSION_LANGUAGE="${DISABLE_EXPRESSION_LANGUAGE}"
-DDISABLE_ROCKSDB="${DISABLE_ROCKSDB}" \
-DDISABLE_LIBARCHIVE="${DISABLE_LIBARCHIVE}"
-DDISABLE_LZMA="${DISABLE_LZMA}" -DDISABLE_BZIP2="${DISABLE_BZIP2}" \
- -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}"
-DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}"
-DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}" \
- -DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}"
-DAWS_ENABLE_UNITY_BUILD=OFF -DEXCLUDE_BOOST=ON
-DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
+ -DENABLE_SCRIPTING="${ENABLE_SCRIPTING}"
-DDISABLE_PYTHON_SCRIPTING="${DISABLE_PYTHON_SCRIPTING}"
-DENABLE_LUA_SCRIPTING="${ENABLE_LUA_SCRIPTING}" \
+ -DDISABLE_CONTROLLER="${DISABLE_CONTROLLER}"
-DENABLE_ENCRYPT_CONFIG="${ENABLE_ENCRYPT_CONFIG}" -DAWS_ENABLE_UNITY_BUILD=OFF
-DEXCLUDE_BOOST=ON -DCMAKE_BUILD_TYPE="${CMAKE_BUILD_TYPE}" .. && \
make -j "$(nproc)" package && \
tar -xzvf
"${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}.tar.gz" -C
"${MINIFI_BASE_DIR}"
@@ -139,8 +141,9 @@ ARG ENABLE_USB_CAMERA=OFF
ARG ENABLE_OPENCV=OFF
ARG ENABLE_PYTHON=OFF
ARG ENABLE_BUSTACHE=OFF
-ARG ENABLE_SCRIPTING=ON
+ARG ENABLE_SCRIPTING=OFF
ARG DISABLE_PYTHON_SCRIPTING=
+ARG ENABLE_LUA_SCRIPTING=
# Add testing repo for rocksdb
RUN echo 'http://dl-cdn.alpinelinux.org/alpine/edge/testing' >>
/etc/apk/repositories
@@ -160,7 +163,8 @@ RUN addgroup -g ${GID} ${USER} && adduser -u ${UID} -D -G
${USER} -g "" ${USER}
if [ "$ENABLE_PCAP" = "ON" ]; then apk add --no-cache libpcap; fi && \
if [ "$ENABLE_USB_CAMERA" = "ON" ]; then apk add --no-cache libpng libusb;
fi && \
if [ "$ENABLE_OPENCV" = "ON" ] || [ "$ENABLE_BUSTACHE" = "ON" ]; then apk
add --no-cache boost; fi && \
- if { [ "$ENABLE_PYTHON" = "ON" ] || [ "$ENABLE_SCRIPTING" = "ON" ]; } && [
-z "$DISABLE_PYTHON_SCRIPTING" ]; then apk add --no-cache python3-dev; fi
+ if [ "$ENABLE_SCRIPTING" = "ON" ] && [ -n "$ENABLE_LUA_SCRIPTING" ]; then
apk add --no-cache lua; fi && \
+ if [ "$ENABLE_SCRIPTING" = "ON" ] && [ -z "$DISABLE_PYTHON_SCRIPTING" ];
then apk add --no-cache python3; fi
# Copy built minifi distribution from builder
COPY --from=build --chown=${USER}:${USER} ${MINIFI_VERSIONED_HOME}
${MINIFI_HOME}
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py
b/docker/test/integration/MiNiFi_integration_test_driver.py
index df0015b..2a576cd 100644
--- a/docker/test/integration/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -203,21 +203,21 @@ class MiNiFi_integration_test():
def check_splunk_event_with_attributes(self, splunk_container_name, query,
attributes):
assert
self.cluster.check_splunk_event_with_attributes(splunk_container_name, query,
attributes)
- def check_minifi_log_contents(self, line, timeout_seconds=60):
- self.check_container_log_contents("minifi-cpp", line, timeout_seconds)
+ def check_minifi_log_contents(self, line, timeout_seconds=60, count=1):
+ self.check_container_log_contents("minifi-cpp", line, timeout_seconds,
count)
- def check_minifi_log_matches_regex(self, regex, timeout_seconds=60):
+ def check_minifi_log_matches_regex(self, regex, timeout_seconds=60,
count=1):
for container in self.cluster.containers.values():
if container.get_engine() == "minifi-cpp":
- line_found =
self.cluster.wait_for_app_logs_regex(container.get_name(), regex,
timeout_seconds)
+ line_found =
self.cluster.wait_for_app_logs_regex(container.get_name(), regex,
timeout_seconds, count)
if line_found:
return
assert False
- def check_container_log_contents(self, container_engine, line,
timeout_seconds=60):
+ def check_container_log_contents(self, container_engine, line,
timeout_seconds=60, count=1):
for container in self.cluster.containers.values():
if container.get_engine() == container_engine:
- line_found =
self.cluster.wait_for_app_logs(container.get_name(), line, timeout_seconds)
+ line_found =
self.cluster.wait_for_app_logs(container.get_name(), line, timeout_seconds,
count)
if line_found:
return
assert False
diff --git a/docker/test/integration/features/script.feature
b/docker/test/integration/features/script.feature
new file mode 100644
index 0000000..752a142
--- /dev/null
+++ b/docker/test/integration/features/script.feature
@@ -0,0 +1,25 @@
+Feature: MiNiFi can execute Lua and Python scripts
+ Background:
+ Given the content of "/tmp/output" is monitored
+
+ Scenario: ExecuteScript should only allow the number of parallel tasks
defined by the max concurrent tasks attribute for Lua scripts
+ Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
+ And the scheduling period of the GenerateFlowFile processor is set to "500
ms"
+ And a ExecuteScript processor with the "Script File" property set to
"/tmp/resources/lua/sleep_forever.lua"
+ And the "Script Engine" property of the ExecuteScript processor is set to
"lua"
+ And the max concurrent tasks attribute of the ExecuteScript processor is
set to 3
+ And the "success" relationship of the GenerateFlowFile processor is
connected to the ExecuteScript
+
+ When all instances start up
+ Then the Minifi logs contain the following message: "Sleeping forever" 3
times after 5 seconds
+
+ Scenario: ExecuteScript should only allow one Python script running at a time
+ Given a GenerateFlowFile processor with the "File Size" property set to
"0B"
+ And the scheduling period of the GenerateFlowFile processor is set to "500
ms"
+ And a ExecuteScript processor with the "Script File" property set to
"/tmp/resources/python/sleep_forever.py"
+ And the "Script Engine" property of the ExecuteScript processor is set to
"python"
+ And the max concurrent tasks attribute of the ExecuteScript processor is
set to 3
+ And the "success" relationship of the GenerateFlowFile processor is
connected to the ExecuteScript
+
+ When all instances start up
+ Then the Minifi logs contain the following message: "Sleeping forever" 1
times after 5 seconds
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
index 1673327..0b3f1ec 100644
--- a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -26,6 +26,7 @@ class DockerTestDirectoryBindings:
shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs",
self.data_directories[test_id]["resources_dir"] + "/certs")
shutil.copytree(test_dir + "/resources/python",
self.data_directories[test_id]["resources_dir"] + "/python")
shutil.copytree(test_dir + "/resources/opcua",
self.data_directories[test_id]["resources_dir"] + "/opcua")
+ shutil.copytree(test_dir + "/resources/lua",
self.data_directories[test_id]["resources_dir"] + "/lua")
def get_data_directories(self, test_id):
return self.data_directories[test_id]
diff --git a/docker/test/integration/minifi/core/ImageStore.py
b/docker/test/integration/minifi/core/ImageStore.py
index bfd03c5..c602344 100644
--- a/docker/test/integration/minifi/core/ImageStore.py
+++ b/docker/test/integration/minifi/core/ImageStore.py
@@ -77,6 +77,7 @@ class ImageStore:
echo "Password = password" >> /etc/odbc.ini && \
echo "Database = postgres" >> /etc/odbc.ini
RUN sed -i -e 's/INFO/DEBUG/g'
{minifi_root}/conf/minifi-log.properties
+ RUN echo nifi.flow.engine.threads=5 >>
{minifi_root}/conf/minifi.properties
USER minificpp
""".format(base_image='apacheminificpp:' +
MinifiContainer.MINIFI_VERSION,
minifi_root=MinifiContainer.MINIFI_ROOT))
diff --git a/docker/test/integration/minifi/core/Processor.py
b/docker/test/integration/minifi/core/Processor.py
index 97e0ee6..161926b 100644
--- a/docker/test/integration/minifi/core/Processor.py
+++ b/docker/test/integration/minifi/core/Processor.py
@@ -9,7 +9,8 @@ class Processor(Connectable):
name=None,
controller_services=None,
auto_terminate=None,
- class_prefix='org.apache.nifi.processors.standard.'):
+ class_prefix='org.apache.nifi.processors.standard.',
+ max_concurrent_tasks=1):
super(Processor, self).__init__(name=name,
auto_terminate=auto_terminate)
@@ -24,12 +25,10 @@ class Processor(Connectable):
if properties is None:
properties = {}
- if name is None:
- pass
-
self.clazz = clazz
self.properties = properties
self.controller_services = controller_services
+ self.max_concurrent_tasks = max_concurrent_tasks
self.schedule = {
'scheduling strategy': 'TIMER_DRIVEN',
@@ -46,6 +45,9 @@ class Processor(Connectable):
else:
self.properties[key] = value
+ def set_max_concurrent_tasks(self, max_concurrent_tasks):
+ self.max_concurrent_tasks = max_concurrent_tasks
+
def unset_property(self, key):
self.properties.pop(key, None)
diff --git
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
index 25b50a3..430bb48 100644
---
a/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
+++
b/docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py
@@ -80,7 +80,8 @@ class Minifi_flow_yaml_serializer:
'yield period': connectable.schedule['yield period'],
'run duration nanos': connectable.schedule['run duration
nanos'],
'Properties': connectable.properties,
- 'auto-terminated relationships list':
connectable.auto_terminate
+ 'auto-terminated relationships list':
connectable.auto_terminate,
+ 'max concurrent tasks': connectable.max_concurrent_tasks
})
for svc in connectable.controller_services:
diff --git
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
index cc1125d..391e3fa 100644
---
a/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
+++
b/docker/test/integration/minifi/flow_serialization/Nifi_flow_xml_serializer.py
@@ -120,7 +120,7 @@ class Nifi_flow_xml_serializer:
conn_destination.append(proc_bundle)
proc_max_concurrent_tasks = Element('maxConcurrentTasks')
- proc_max_concurrent_tasks.text = '1'
+ proc_max_concurrent_tasks.text =
str(connectable.max_concurrent_tasks)
conn_destination.append(proc_max_concurrent_tasks)
proc_scheduling_period = Element('schedulingPeriod')
diff --git a/docker/test/integration/minifi/processors/ExecuteScript.py
b/docker/test/integration/minifi/processors/ExecuteScript.py
new file mode 100644
index 0000000..3ff3bc7
--- /dev/null
+++ b/docker/test/integration/minifi/processors/ExecuteScript.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+
+class ExecuteScript(Processor):
+ def __init__(self):
+ super(ExecuteScript, self).__init__(
+ 'ExecuteScript',
+ auto_terminate=['success', 'failure'])
diff --git a/docker/test/integration/resources/lua/sleep_forever.lua
b/docker/test/integration/resources/lua/sleep_forever.lua
new file mode 100644
index 0000000..96d4e38
--- /dev/null
+++ b/docker/test/integration/resources/lua/sleep_forever.lua
@@ -0,0 +1,12 @@
+function sleep(n)
+ local t0 = os.clock()
+ while os.clock() - t0 <= n do end
+end
+
+function onTrigger(context, session)
+ log:info('Sleeping forever')
+
+ while true do
+ sleep(1)
+ end
+end
diff --git a/docker/test/integration/resources/python/sleep_forever.py
b/docker/test/integration/resources/python/sleep_forever.py
new file mode 100644
index 0000000..3e8952c
--- /dev/null
+++ b/docker/test/integration/resources/python/sleep_forever.py
@@ -0,0 +1,7 @@
+import time
+
+
+def onTrigger(context, session):
+ log.info("Sleeping forever")
+ while True:
+ time.sleep(1)
diff --git a/docker/test/integration/steps/steps.py
b/docker/test/integration/steps/steps.py
index 183e6eb..cc27d4d 100644
--- a/docker/test/integration/steps/steps.py
+++ b/docker/test/integration/steps/steps.py
@@ -152,6 +152,12 @@ def step_impl(context, property_name, processor_name_one,
processor_name_two):
context.test.get_node_by_name(processor_name_two).set_property(property_name,
uuid_str)
+@given("the max concurrent tasks attribute of the {processor_name} processor
is set to {max_concurrent_tasks:d}")
+def step_impl(context, processor_name, max_concurrent_tasks):
+ processor = context.test.get_node_by_name(processor_name)
+ processor.set_max_concurrent_tasks(max_concurrent_tasks)
+
+
@given("the \"{property_name}\" property of the {processor_name} processor is
set to match {key_attribute_encoding} encoded kafka message key
\"{message_key}\"")
def step_impl(context, property_name, processor_name, key_attribute_encoding,
message_key):
encoded_key = ""
@@ -615,6 +621,12 @@ def step_impl(context, log_message, duration):
context.test.check_minifi_log_contents(log_message, timeparse(duration))
+@then("the Minifi logs contain the following message: \"{log_message}\"
{count:d} times after {seconds:d} seconds")
+def step_impl(context, log_message, count, seconds):
+ time.sleep(seconds)
+ context.test.check_minifi_log_contents(log_message, 1, count)
+
+
@then("the Minifi logs match the following regex: \"{regex}\" in less than
{duration}")
def step_impl(context, regex, duration):
context.test.check_minifi_log_matches_regex(regex, timeparse(duration))
diff --git a/extensions/script/CMakeLists.txt b/extensions/script/CMakeLists.txt
index bc35066..a95a14d 100644
--- a/extensions/script/CMakeLists.txt
+++ b/extensions/script/CMakeLists.txt
@@ -49,9 +49,8 @@ if (ENABLE_LUA_SCRIPTING)
SET(CMAKE_FIND_PACKAGE_SORT_DIRECTION ASC)
find_package(Lua 5.1 REQUIRED)
- include_directories(${LUA_INCLUDE_DIR})
-
- include_directories(lua)
+ target_include_directories(minifi-script-extensions PRIVATE lua)
+ target_include_directories(minifi-script-extensions PUBLIC
${LUA_INCLUDE_DIR})
add_definitions(-DLUA_SUPPORT)
file(GLOB LUA_SOURCES "lua/*.cpp")
diff --git a/extensions/script/ExecuteScript.cpp
b/extensions/script/ExecuteScript.cpp
index ee69175..77029f1 100644
--- a/extensions/script/ExecuteScript.cpp
+++ b/extensions/script/ExecuteScript.cpp
@@ -27,12 +27,9 @@
#include <PythonScriptEngine.h>
#endif // PYTHON_SUPPORT
-#ifdef LUA_SUPPORT
-#include <LuaScriptEngine.h>
-#endif // LUA_SUPPORT
-
#include "ExecuteScript.h"
#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
namespace org {
namespace apache {
@@ -40,17 +37,28 @@ namespace nifi {
namespace minifi {
namespace processors {
-core::Property ExecuteScript::ScriptEngine("Script Engine", // NOLINT
- R"(The engine to execute scripts (python, lua))", "python");
-core::Property ExecuteScript::ScriptFile("Script File", // NOLINT
+core::Property ExecuteScript::ScriptEngine(
+ core::PropertyBuilder::createProperty("Script Engine")
+ ->withDescription(R"(The engine to execute scripts (python, lua))")
+ ->isRequired(true)
+ ->withAllowableValues(ScriptEngineOption::values())
+ ->withDefaultValue(toString(ScriptEngineOption::PYTHON))
+ ->build());
+core::Property ExecuteScript::ScriptFile("Script File",
R"(Path to script file to execute. Only one of Script File or Script Body
may be used)", "");
-core::Property ExecuteScript::ScriptBody("Script Body", // NOLINT
+core::Property ExecuteScript::ScriptBody("Script Body",
R"(Body of script to execute. Only one of Script File or Script Body may
be used)", "");
-core::Property ExecuteScript::ModuleDirectory("Module Directory", // NOLINT
+core::Property ExecuteScript::ModuleDirectory("Module Directory",
R"(Comma-separated list of paths to files and/or directories which contain
modules required by the script)", "");
-core::Relationship ExecuteScript::Success("success", "Script successes"); //
NOLINT
-core::Relationship ExecuteScript::Failure("failure", "Script failures"); //
NOLINT
+core::Relationship ExecuteScript::Success("success", "Script successes");
+core::Relationship ExecuteScript::Failure("failure", "Script failures");
+
+ScriptEngineFactory::ScriptEngineFactory(core::Relationship& success,
core::Relationship& failure, std::shared_ptr<core::logging::Logger> logger)
+ : success_(success),
+ failure_(failure),
+ logger_(logger) {
+}
void ExecuteScript::initialize() {
std::set<core::Property> properties;
@@ -71,17 +79,25 @@ void ExecuteScript::initialize() {
}
void ExecuteScript::onSchedule(core::ProcessContext *context,
core::ProcessSessionFactory* /*sessionFactory*/) {
- if (!context->getProperty(ScriptEngine.getName(), script_engine_)) {
- logger_->log_error("Script Engine attribute is missing or invalid");
- }
+#ifdef LUA_SUPPORT
+ script_engine_q_ =
std::make_unique<ScriptEngineQueue<lua::LuaScriptEngine>>(getMaxConcurrentTasks(),
engine_factory_, logger_);
+#endif // LUA_SUPPORT
+#ifdef PYTHON_SUPPORT
+ python_script_engine_ =
engine_factory_.createEngine<python::PythonScriptEngine>();
+#endif // PYTHON_SUPPORT
+
+ script_engine_ =
ScriptEngineOption::parse(utils::parsePropertyWithAllowableValuesOrThrow(*context,
ScriptEngine.getName(), ScriptEngineOption::values()).c_str());
context->getProperty(ScriptFile.getName(), script_file_);
context->getProperty(ScriptBody.getName(), script_body_);
context->getProperty(ModuleDirectory.getName(), module_directory_);
if (script_file_.empty() && script_body_.empty()) {
- logger_->log_error("Either Script Body or Script File must be defined");
- return;
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Either Script Body or Script
File must be defined");
+ }
+
+ if (!script_file_.empty() && !script_body_.empty()) {
+ throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Only one of Script File or
Script Body may be defined!");
}
}
@@ -89,70 +105,53 @@ void ExecuteScript::onTrigger(const
std::shared_ptr<core::ProcessContext> &conte
const std::shared_ptr<core::ProcessSession>
&session) {
std::shared_ptr<script::ScriptEngine> engine;
- // Use an existing engine, if one is available
- if (script_engine_q_.try_dequeue(engine)) {
- logger_->log_debug("Using available %s script engine instance",
script_engine_);
- } else {
- logger_->log_info("Creating new %s script instance", script_engine_);
- logger_->log_info("Approximately %d %s script instances created for this
processor",
- script_engine_q_.size_approx(),
- script_engine_);
-
- if (script_engine_ == "python") {
+ if (script_engine_ == ScriptEngineOption::PYTHON) {
#ifdef PYTHON_SUPPORT
- engine = createEngine<python::PythonScriptEngine>();
+ engine = python_script_engine_;
#else
- throw std::runtime_error("Python support is disabled in this build.");
+ throw std::runtime_error("Python support is disabled in this build.");
#endif // PYTHON_SUPPORT
- } else if (script_engine_ == "lua") {
+ } else if (script_engine_ == ScriptEngineOption::LUA) {
#ifdef LUA_SUPPORT
- engine = createEngine<lua::LuaScriptEngine>();
+ engine = script_engine_q_->getScriptEngine();
#else
- throw std::runtime_error("Lua support is disabled in this build.");
+ throw std::runtime_error("Lua support is disabled in this build.");
#endif // LUA_SUPPORT
- }
-
- if (engine == nullptr) {
- throw std::runtime_error("No script engine available");
- }
-
- if (!script_body_.empty()) {
- engine->eval(script_body_);
- } else if (!script_file_.empty()) {
- engine->evalFile(script_file_);
- } else {
- throw std::runtime_error("Neither Script Body nor Script File is
available to execute");
- }
}
- if (script_engine_ == "python") {
+ if (engine == nullptr) {
+ throw std::runtime_error("No script engine available");
+ }
+
+ if (!script_body_.empty()) {
+ engine->eval(script_body_);
+ } else if (!script_file_.empty()) {
+ engine->evalFile(script_file_);
+ } else {
+ throw std::runtime_error("Neither Script Body nor Script File is available
to execute");
+ }
+
+ if (script_engine_ == ScriptEngineOption::PYTHON) {
#ifdef PYTHON_SUPPORT
triggerEngineProcessor<python::PythonScriptEngine>(engine, context,
session);
#else
throw std::runtime_error("Python support is disabled in this build.");
#endif // PYTHON_SUPPORT
- } else if (script_engine_ == "lua") {
+ } else if (script_engine_ == ScriptEngineOption::LUA) {
#ifdef LUA_SUPPORT
triggerEngineProcessor<lua::LuaScriptEngine>(engine, context, session);
+
script_engine_q_->returnScriptEngine(std::static_pointer_cast<lua::LuaScriptEngine>(engine));
#else
throw std::runtime_error("Lua support is disabled in this build.");
#endif // LUA_SUPPORT
}
-
- // Make engine available for use again
- if (script_engine_q_.size_approx() < getMaxConcurrentTasks()) {
- logger_->log_debug("Releasing %s script engine", script_engine_);
- script_engine_q_.enqueue(engine);
- } else {
- logger_->log_info("Destroying script engine because it is no longer
needed");
- }
}
REGISTER_RESOURCE(ExecuteScript, "Executes a script given the flow file and a
process session. The script is responsible for handling the incoming flow file
(transfer to SUCCESS or remove, e.g.) "
"as well as any flow files created by the script. If the handling is
incomplete or incorrect, the session will be rolled back.Scripts must define an
onTrigger function which accepts NiFi Context"
" and Property objects. For efficiency, scripts are executed once when the
processor is run, then the onTrigger method is called for each incoming
flowfile. This enables scripts to keep state "
"if they wish, although there will be a script context per concurrent task
of the processor. In order to, e.g., compute an arithmetic sum based on
incoming flow file information, set the "
- "concurrent tasks to 1."); // NOLINT
+ "concurrent tasks to 1.");
} /* namespace processors */
} /* namespace minifi */
diff --git a/extensions/script/ExecuteScript.h
b/extensions/script/ExecuteScript.h
index 00d9ba7..b121967 100644
--- a/extensions/script/ExecuteScript.h
+++ b/extensions/script/ExecuteScript.h
@@ -22,23 +22,118 @@
#include <string>
#include <memory>
+#include <utility>
#include "concurrentqueue.h"
#include "core/Processor.h"
#include "ScriptEngine.h"
#include "ScriptProcessContext.h"
+#include "utils/Enum.h"
+
+#ifdef LUA_SUPPORT
+#include "lua/LuaScriptEngine.h"
+#endif // LUA_SUPPORT
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
+
+#ifdef PYTHON_SUPPORT
+namespace python {
+class PythonScriptEngine;
+}
+#endif // PYTHON_SUPPORT
+
namespace processors {
+class ScriptEngineFactory {
+ public:
+ ScriptEngineFactory(core::Relationship& success, core::Relationship&
failure, std::shared_ptr<core::logging::Logger> logger);
+
+ template<typename T>
+ std::enable_if_t<std::is_base_of_v<script::ScriptEngine, T>,
std::shared_ptr<T>> createEngine() const {
+ auto engine = std::make_shared<T>();
+
+ engine->bind("log", logger_);
+ engine->bind("REL_SUCCESS", success_);
+ engine->bind("REL_FAILURE", failure_);
+
+ return engine;
+ }
+
+ private:
+ core::Relationship& success_;
+ core::Relationship& failure_;
+ std::shared_ptr<core::logging::Logger> logger_;
+};
+
+template<typename T, typename =
std::enable_if_t<std::is_base_of_v<script::ScriptEngine, T>>>
+class ScriptEngineQueue {
+ public:
+ ScriptEngineQueue(uint8_t max_engine_count, ScriptEngineFactory&
engine_factory, std::shared_ptr<core::logging::Logger> logger)
+ : max_engine_count_(max_engine_count),
+ engine_factory_(engine_factory),
+ logger_(logger) {
+ }
+
+ std::shared_ptr<script::ScriptEngine> getScriptEngine() {
+ std::shared_ptr<script::ScriptEngine> engine;
+ // Use an existing engine, if one is available
+ if (engine_queue_.try_dequeue(engine)) {
+ logger_->log_debug("Using available [%p] script engine instance",
engine.get());
+ return engine;
+ } else {
+ const std::lock_guard<std::mutex> lock(counter_mutex_);
+ if (engine_instance_count_ < max_engine_count_) {
+ ++engine_instance_count_;
+ engine = engine_factory_.createEngine<T>();
+ logger_->log_info("Created new [%p] script engine instance. Number of
instances: %d / %d.", engine.get(), engine_instance_count_, max_engine_count_);
+ return engine;
+ }
+ }
+
+ std::unique_lock<std::mutex> lock(queue_mutex_);
+ logger_->log_debug("Waiting for available script engine instance...");
+ queue_cv_.wait(lock, [this](){ return engine_queue_.size_approx() > 0; });
+ if (!engine_queue_.try_dequeue(engine)) {
+ throw std::runtime_error("No script engine available");
+ }
+ return engine;
+ }
+
+ void returnScriptEngine(std::shared_ptr<T>&& engine) {
+ const std::lock_guard<std::mutex> lock(queue_mutex_);
+ if (engine_queue_.size_approx() < max_engine_count_) {
+ logger_->log_debug("Releasing [%p] script engine", engine.get());
+ engine_queue_.enqueue(std::move(engine));
+ } else {
+ logger_->log_info("Destroying script engine because it is no longer
needed");
+ }
+ }
+
+ private:
+ const uint8_t max_engine_count_;
+ ScriptEngineFactory& engine_factory_;
+ std::shared_ptr<core::logging::Logger> logger_;
+ moodycamel::ConcurrentQueue<std::shared_ptr<T>> engine_queue_;
+ std::mutex queue_mutex_;
+ std::condition_variable queue_cv_;
+ uint8_t engine_instance_count_ = 0;
+ std::mutex counter_mutex_;
+};
+
class ExecuteScript : public core::Processor {
public:
+ SMART_ENUM(ScriptEngineOption,
+ (LUA, "lua"),
+ (PYTHON, "python")
+ )
+
explicit ExecuteScript(const std::string &name, const utils::Identifier
&uuid = {})
- : Processor(name, uuid) {
+ : Processor(name, uuid),
+ engine_factory_(Success, Failure, logger_) {
}
static core::Property ScriptEngine;
@@ -60,23 +155,18 @@ class ExecuteScript : public core::Processor {
private:
std::shared_ptr<core::logging::Logger> logger_ =
core::logging::LoggerFactory<ExecuteScript>::getLogger();
- std::string script_engine_;
+ ScriptEngineOption script_engine_;
std::string script_file_;
std::string script_body_;
std::string module_directory_;
- moodycamel::ConcurrentQueue<std::shared_ptr<script::ScriptEngine>>
script_engine_q_;
-
- template<typename T>
- std::shared_ptr<T> createEngine() const {
- auto engine = std::make_shared<T>();
-
- engine->bind("log", logger_);
- engine->bind("REL_SUCCESS", Success);
- engine->bind("REL_FAILURE", Failure);
-
- return engine;
- }
+ ScriptEngineFactory engine_factory_;
+#ifdef LUA_SUPPORT
+ std::unique_ptr<ScriptEngineQueue<lua::LuaScriptEngine>> script_engine_q_;
+#endif // LUA_SUPPORT
+#ifdef PYTHON_SUPPORT
+ std::shared_ptr<python::PythonScriptEngine> python_script_engine_;
+#endif // PYTHON_SUPPORT
template<typename T>
void triggerEngineProcessor(const std::shared_ptr<script::ScriptEngine>
&engine,
diff --git a/extensions/script/tests/CMakeLists.txt
b/extensions/script/tests/CMakeLists.txt
index 3319763..cf4bdc8 100644
--- a/extensions/script/tests/CMakeLists.txt
+++ b/extensions/script/tests/CMakeLists.txt
@@ -83,7 +83,6 @@ FOREACH(testfile ${EXECUTESCRIPT_LUA_TESTS})
target_include_directories(${testfilename} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/extensions/script/lua")
target_include_directories(${testfilename} PRIVATE BEFORE
"${CMAKE_SOURCE_DIR}/libminifi/test/")
target_include_directories(${testfilename} SYSTEM PRIVATE BEFORE
"${SOL2_INCLUDE_DIR}")
- target_include_directories(${testfilename} PRIVATE BEFORE
${LUA_INCLUDE_DIR})
add_definitions(-DLUA_SUPPORT)
target_link_libraries(${testfilename} minifi-script-extensions)
target_link_libraries(${testfilename} minifi-standard-processors)
diff --git a/extensions/script/tests/LuaScriptEngineTests.cpp
b/extensions/script/tests/LuaScriptEngineTests.cpp
index 27716fd..757ad5a 100644
--- a/extensions/script/tests/LuaScriptEngineTests.cpp
+++ b/extensions/script/tests/LuaScriptEngineTests.cpp
@@ -26,7 +26,10 @@ TEST_CASE("LuaScriptEngine errors during eval",
"[luascriptengineeval]") {
LuaScriptEngine engine;
REQUIRE_NOTHROW(engine.eval("print('foo')"));
// The exception message comes from the lua engine
- REQUIRE_THROWS_MATCHES(engine.eval("shout('foo')"), ScriptException,
ExceptionSubStringMatcher<ScriptException>({"global 'shout' is not callable (a
nil value)", "attempt to call a nil value"}));
+ REQUIRE_THROWS_MATCHES(
+ engine.eval("shout('foo')"),
+ ScriptException,
+ ExceptionSubStringMatcher<ScriptException>({"global 'shout' is not
callable (a nil value)", "attempt to call a nil value", "attempt to call global
'shout'"}));
}
TEST_CASE("LuaScriptEngine errors during call", "[luascriptenginecall]") {
@@ -42,5 +45,8 @@ TEST_CASE("LuaScriptEngine errors during call",
"[luascriptenginecall]") {
)"));
REQUIRE_NOTHROW(engine.call("foo"));
// The exception message comes from the lua engine
- REQUIRE_THROWS_MATCHES(engine.call("bar"), ScriptException,
ExceptionSubStringMatcher<ScriptException>({"global 'shout' is not callable (a
nil value)", "attempt to call a nil value"}));
+ REQUIRE_THROWS_MATCHES(
+ engine.call("bar"),
+ ScriptException,
+ ExceptionSubStringMatcher<ScriptException>({"global 'shout' is not
callable (a nil value)", "attempt to call a nil value", "attempt to call global
'shout'"}));
}
diff --git
a/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp
b/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp
index 5667858..4a3019a 100644
--- a/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp
+++ b/extensions/script/tests/TestExecuteScriptProcessorWithLuaScript.cpp
@@ -27,7 +27,47 @@
#include "processors/GetFile.h"
#include "processors/PutFile.h"
-TEST_CASE("Lua: Test Log", "[executescriptLuaLog]") { // NOLINT
+TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+
+ auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");
+
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptEngine.getName(), "");
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptFile.getName(), "/path/to/script.lua");
+
+ REQUIRE_THROWS_AS(testController.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE("Neither script body nor script file is set",
"[executescriptMisconfiguration]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+
+ auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");
+
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptEngine.getName(), "lua");
+
+ REQUIRE_THROWS_AS(testController.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE("Test both script body and script file set",
"[executescriptMisconfiguration]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+
+ auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");
+
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptEngine.getName(), "lua");
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptFile.getName(), "/path/to/script.lua");
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptBody.getName(), R"(
+ function onTrigger(context, session)
+ log:info('hello from lua')
+ end
+ )");
+
+ REQUIRE_THROWS_AS(testController.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE("Lua: Test Log", "[executescriptLuaLog]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -70,7 +110,7 @@ TEST_CASE("Lua: Test Log", "[executescriptLuaLog]") { //
NOLINT
logTestController.reset();
}
-TEST_CASE("Lua: Test Read File", "[executescriptLuaRead]") { // NOLINT
+TEST_CASE("Lua: Test Read File", "[executescriptLuaRead]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -158,7 +198,7 @@ TEST_CASE("Lua: Test Read File", "[executescriptLuaRead]")
{ // NOLINT
logTestController.reset();
}
-TEST_CASE("Lua: Test Write File", "[executescriptLuaWrite]") { // NOLINT
+TEST_CASE("Lua: Test Write File", "[executescriptLuaWrite]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -244,7 +284,7 @@ TEST_CASE("Lua: Test Write File",
"[executescriptLuaWrite]") { // NOLINT
logTestController.reset();
}
-TEST_CASE("Lua: Test Update Attribute", "[executescriptLuaUpdateAttribute]") {
// NOLINT
+TEST_CASE("Lua: Test Update Attribute", "[executescriptLuaUpdateAttribute]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -299,7 +339,7 @@ TEST_CASE("Lua: Test Update Attribute",
"[executescriptLuaUpdateAttribute]") { /
logTestController.reset();
}
-TEST_CASE("Lua: Test Create", "[executescriptLuaCreate]") { // NOLINT
+TEST_CASE("Lua: Test Create", "[executescriptLuaCreate]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -332,7 +372,7 @@ TEST_CASE("Lua: Test Create", "[executescriptLuaCreate]") {
// NOLINT
logTestController.reset();
}
-TEST_CASE("Lua: Test Require", "[executescriptLuaRequire]") { // NOLINT
+TEST_CASE("Lua: Test Require", "[executescriptLuaRequire]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
diff --git
a/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp
b/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp
index 1124745..100d0c1 100644
--- a/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp
+++ b/extensions/script/tests/TestExecuteScriptProcessorWithPythonScript.cpp
@@ -27,7 +27,46 @@
#include "processors/GetFile.h"
#include "processors/PutFile.h"
-TEST_CASE("Python: Test Read File", "[executescriptPythonRead]") { // NOLINT
+TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+
+ auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");
+
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptEngine.getName(), "");
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptFile.getName(), "/path/to/script.py");
+
+ REQUIRE_THROWS_AS(testController.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE("Neither script body nor script file is set",
"[executescriptMisconfiguration]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+
+ auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");
+
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptEngine.getName(), "python");
+
+ REQUIRE_THROWS_AS(testController.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE("Test both script body and script file set",
"[executescriptMisconfiguration]") {
+ TestController testController;
+ auto plan = testController.createPlan();
+
+ auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");
+
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptEngine.getName(), "python");
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptFile.getName(), "/path/to/script.py");
+ plan->setProperty(executeScript,
minifi::processors::ExecuteScript::ScriptBody.getName(), R"(
+ def onTrigger(context, session):
+ log.info('hello from python')
+ )");
+
+ REQUIRE_THROWS_AS(testController.runSession(plan, true), minifi::Exception);
+}
+
+TEST_CASE("Python: Test Read File", "[executescriptPythonRead]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -112,7 +151,7 @@ TEST_CASE("Python: Test Read File",
"[executescriptPythonRead]") { // NOLINT
logTestController.reset();
}
-TEST_CASE("Python: Test Write File", "[executescriptPythonWrite]") { // NOLINT
+TEST_CASE("Python: Test Write File", "[executescriptPythonWrite]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -192,7 +231,7 @@ TEST_CASE("Python: Test Write File",
"[executescriptPythonWrite]") { // NOLINT
logTestController.reset();
}
-TEST_CASE("Python: Test Create", "[executescriptPythonCreate]") { // NOLINT
+TEST_CASE("Python: Test Create", "[executescriptPythonCreate]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -222,7 +261,7 @@ TEST_CASE("Python: Test Create",
"[executescriptPythonCreate]") { // NOLINT
logTestController.reset();
}
-TEST_CASE("Python: Test Update Attribute",
"[executescriptPythonUpdateAttribute]") { // NOLINT
+TEST_CASE("Python: Test Update Attribute",
"[executescriptPythonUpdateAttribute]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();
@@ -274,7 +313,7 @@ TEST_CASE("Python: Test Update Attribute",
"[executescriptPythonUpdateAttribute]
logTestController.reset();
}
-TEST_CASE("Python: Test Get Context Property",
"[executescriptPythonGetContextProperty]") { // NOLINT
+TEST_CASE("Python: Test Get Context Property",
"[executescriptPythonGetContextProperty]") {
TestController testController;
LogTestController &logTestController = LogTestController::getInstance();