This is an automated email from the ASF dual-hosted git repository. phrocker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 09fd20476d0f404020b15d2c8c13b620f48e0ffd Author: Arpad Boda <[email protected]> AuthorDate: Fri Dec 14 17:33:34 2018 +0100 MINIFICPP-692 - Ensure calls to get a flowfile are consistent across languages This closes #461. Signed-off-by: Marc Parisi <[email protected]> --- nanofi/include/api/nanofi.h | 8 +++----- nanofi/src/api/nanofi.cpp | 20 ++++++------------- nanofi/tests/CAPITests.cpp | 2 +- python/getFile.py | 16 +++++++++------ python/minifi/__init__.py | 47 ++++++++++++++++++++++++++++++++++++--------- 5 files changed, 58 insertions(+), 35 deletions(-) diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h index 0650492..04f43e5 100644 --- a/nanofi/include/api/nanofi.h +++ b/nanofi/include/api/nanofi.h @@ -131,7 +131,7 @@ flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c); **/ processor *add_processor(flow * flow, const char * name); -processor *add_python_processor(flow *, void (*ontrigger_callback)(processor_session *session)); +processor *add_python_processor(flow *, processor_logic* logic); /** * Create a standalone instance of the given processor. @@ -233,8 +233,6 @@ flow_file_record *get_next_flow_file(nifi_instance *, flow *); **/ size_t get_flow_files(nifi_instance * instance, flow * flow, flow_file_record ** flowfiles, size_t size); -flow_file_record *get(nifi_instance *,flow *, processor_session *); - /** * Invoke a standalone processor without input data. * The processor is expected to generate flow file. @@ -282,7 +280,7 @@ flow_file_record* create_ff_object_na(const char *file, const size_t len, const * @param context current processor context * @return a flow file record or nullptr in case there is none in the session **/ -flow_file_record* get_flowfile(processor_session* session, processor_context* context); +flow_file_record* get(processor_session *session, processor_context *context); /** @@ -299,7 +297,7 @@ void free_flowfile(flow_file_record* ff); * @size size size of the data pointed by "value" * @return 0 in case of success, -1 otherwise (already existed) **/ -uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size); +uint8_t add_attribute(flow_file_record* ff, const char *key, void *value, size_t size); /** * Updates an attribute (adds if it hasn't existed before) diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp index aa957ac..a58f325 100644 --- a/nanofi/src/api/nanofi.cpp +++ b/nanofi/src/api/nanofi.cpp @@ -397,14 +397,14 @@ flow *create_flow(nifi_instance *instance, const char *first_processor) { return new_flow; } -processor *add_python_processor(flow *flow, void (*ontrigger_callback)(processor_session *)) { - if (nullptr == flow || nullptr == ontrigger_callback) { +processor *add_python_processor(flow *flow, processor_logic* logic) { + if (nullptr == flow || nullptr == logic) { return nullptr; } - auto lambda = [ontrigger_callback](core::ProcessSession *ps) { - ontrigger_callback(static_cast<processor_session*>(ps)); //Meh, sorry for this + auto lambda = [logic](core::ProcessSession *ps, core::ProcessContext *cx) { + logic(static_cast<processor_session*>(ps), static_cast<processor_context*>(cx)); //Meh, sorry for this }; - auto proc = flow->addSimpleCallback(nullptr, lambda); + auto proc = flow->addCallback(nullptr, lambda); return static_cast<processor*>(proc.get()); } @@ -518,7 +518,7 @@ flow_file_record* flowfile_to_record(std::shared_ptr<core::FlowFile> ff, Executi return flowfile_to_record(ff, plan->getContentRepo()); } -flow_file_record* get_flowfile(processor_session* session, processor_context* context) { +flow_file_record* get(processor_session *session, processor_context *context) { auto ff = session->get(); if(!ff) { return nullptr; @@ -554,14 +554,6 @@ size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff return i; } -flow_file_record * get(nifi_instance * instance, flow * flow, processor_session * session) { - if (nullptr == instance || nullptr == flow || nullptr == session) - return nullptr; - auto ff = session->get(); - flow->setNextFlowFile(ff); - return flowfile_to_record(ff, flow); -} - flow_file_record *invoke(standalone_processor* proc) { return invoke_ff(proc, nullptr); } diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp index ca24ba8..35cbfc2 100644 --- a/nanofi/tests/CAPITests.cpp +++ b/nanofi/tests/CAPITests.cpp @@ -55,7 +55,7 @@ void big_failure_counter(flow_file_record * fr) { } void custom_processor_logic(processor_session * ps, processor_context * ctx) { - flow_file_record * ffr = get_flowfile(ps, ctx); + flow_file_record * ffr = get(ps, ctx); REQUIRE(ffr != nullptr); uint8_t * buffer = (uint8_t*)malloc(ffr->size* sizeof(uint8_t)); get_content(ffr, buffer, ffr->size); diff --git a/python/getFile.py b/python/getFile.py index 3d2b9d0..fd3e335 100644 --- a/python/getFile.py +++ b/python/getFile.py @@ -23,16 +23,20 @@ from _cffi_backend import callback class GetFilePrinterProcessor(PyProcessor): - def __init__(self,instance, minifi, flow): - PyProcessor.__init__(self,instance,minifi,flow) + def __init__(self, minifi, flow): + PyProcessor.__init__(self, minifi, flow) self._callback = None def _onTriggerCallback(self): - def onTrigger(session): - flow_file = self.get(session) + def onTrigger(session, context): + flow_file = self.get(session, context) if flow_file: - flow_file.add_attribute("python_test","value") - self.transfer(session,flow_file, "success") + if flow_file.add_attribute("python_test","value"): + print("Add attribute succeeded") + if not flow_file.add_attribute("python_test","value2"): + print("Cannot add the same attribute twice!") + print ("original file name: " + flow_file.get_attribute("filename")) + self.transfer(session, flow_file, "success") return CALLBACK(onTrigger) diff --git a/python/minifi/__init__.py b/python/minifi/__init__.py index 7352eb7..b6de1d6 100644 --- a/python/minifi/__init__.py +++ b/python/minifi/__init__.py @@ -15,7 +15,7 @@ # limitations under the License. from ctypes import cdll import ctypes -from abc import abstractmethod +from abc import abstractmethod @@ -36,13 +36,22 @@ class CFlowFile(ctypes.Structure): ('attributes', ctypes.c_void_p), ('ffp', ctypes.c_void_p)] +class CAttribute(ctypes.Structure): + _fields_ = [('key', ctypes.c_char_p), + ('value', ctypes.c_void_p), + ('value_size', ctypes.c_size_t)] + class CProcessor(ctypes.Structure): _fields_ = [('processor_ptr', ctypes.c_void_p)] class CProcessSession(ctypes.Structure): _fields_ = [('process_session', ctypes.c_void_p)] -CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession)) +class CProcessContext(ctypes.Structure): + _fields_ = [('process_context', ctypes.c_void_p)] + + +CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession), ctypes.POINTER(CProcessContext)) class Processor(object): def __init__(self, cprocessor, minifi): @@ -54,17 +63,16 @@ class Processor(object): self._minifi.set_property( self._proc, name.encode("UTF-8"), value.encode("UTF-8")) class PyProcessor(object): - def __init__(self, instance, minifi, flow): + def __init__(self, minifi, flow): super(PyProcessor, self).__init__() - self._instance = instance self._minifi = minifi self._flow = flow def setBase(self, proc): self._proc = proc - def get(self, session): - ff = self._minifi.get(self._instance.get_instance(),self._flow, session) + def get(self, session, context): + ff = self._minifi.get(session, context) if ff: return FlowFile(self._minifi, ff) else: @@ -102,9 +110,22 @@ class FlowFile(object): self._minifi = minifi self._ff = ff + def get_attribute(self, name): + attr = CAttribute(name.encode("UTF-8"), 0, 0) + if self._minifi.get_attribute(self._ff, attr) != 0: + return "" + if attr.value_size > 0: + return ctypes.cast(attr.value, ctypes.c_char_p).value.decode("ascii") + return "" + def add_attribute(self, name, value): vallen = len(value) - self._minifi.add_attribute(self._ff, name.encode("UTF-8"), value.encode("UTF-8"), vallen) + ret = self._minifi.add_attribute(self._ff, name.encode("UTF-8"), value.encode("UTF-8"), vallen) + return True if ret == 0 else False + + def update_attribute(self, name, value): + vallen = len(value) + self._minifi.update_attribute(self._ff, name.encode("UTF-8"), value.encode("UTF-8"), vallen) def get_instance(self): return self._ff @@ -138,7 +159,7 @@ class MiNiFi(object): self._minifi.transmit_flowfile.argtypes = [ctypes.POINTER(CFlowFile) , ctypes.POINTER(NIFI_STRUCT) ] self._minifi.transmit_flowfile.restype = ctypes.c_int """ get ff """ - self._minifi.get.argtypes = [ctypes.POINTER(NIFI_STRUCT) , ctypes.POINTER(CFlow), ctypes.POINTER(CProcessSession) ] + self._minifi.get.argtypes = [ctypes.POINTER(CProcessSession), ctypes.POINTER(CProcessContext) ] self._minifi.get.restype = ctypes.POINTER(CFlowFile) """ add python processor """ self._minifi.add_python_processor.argtypes = [ctypes.POINTER(CFlow) , ctypes.c_void_p ] @@ -150,6 +171,14 @@ class MiNiFi(object): self._minifi.add_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ] self._minifi.add_attribute.restype = ctypes.c_int + """ update (overwrite) attribute to ff """ + self._minifi.update_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ] + self._minifi.update_attribute.restype = None + + """ get attribute of ff """ + self._minifi.get_attribute.argtypes = [ctypes.POINTER(CFlowFile), ctypes.POINTER(CAttribute) ] + self._minifi.get_attribute.restype = ctypes.c_int + self._minifi.init_api.argtype = ctypes.c_char_p self._minifi.init_api.restype = ctypes.c_int self._minifi.init_api(dll_file.encode("UTF-8")) @@ -178,7 +207,7 @@ class MiNiFi(object): return Processor(proc,self._minifi) def create_python_processor(self, module, processor): - m = getattr(module,processor)(self._instance,self._minifi,self._flow) + m = getattr(module, processor)(self._minifi, self._flow) proc = self._minifi.add_python_processor(self._flow, m.getTriggerCallback()) m.setBase(proc) return m
