This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 09fd20476d0f404020b15d2c8c13b620f48e0ffd
Author: Arpad Boda <[email protected]>
AuthorDate: Fri Dec 14 17:33:34 2018 +0100

    MINIFICPP-692 - Ensure calls to get a flowfile are consistent across 
languages
    
    This closes #461.
    
    Signed-off-by: Marc Parisi <[email protected]>
---
 nanofi/include/api/nanofi.h |  8 +++-----
 nanofi/src/api/nanofi.cpp   | 20 ++++++-------------
 nanofi/tests/CAPITests.cpp  |  2 +-
 python/getFile.py           | 16 +++++++++------
 python/minifi/__init__.py   | 47 ++++++++++++++++++++++++++++++++++++---------
 5 files changed, 58 insertions(+), 35 deletions(-)

diff --git a/nanofi/include/api/nanofi.h b/nanofi/include/api/nanofi.h
index 0650492..04f43e5 100644
--- a/nanofi/include/api/nanofi.h
+++ b/nanofi/include/api/nanofi.h
@@ -131,7 +131,7 @@ flow *create_getfile(nifi_instance *instance, flow *parent, 
GetFileConfig *c);
  **/
 processor *add_processor(flow * flow, const char * name);
 
-processor *add_python_processor(flow *, void 
(*ontrigger_callback)(processor_session *session));
+processor *add_python_processor(flow *, processor_logic* logic);
 
 /**
  * Create a standalone instance of the given processor.
@@ -233,8 +233,6 @@ flow_file_record *get_next_flow_file(nifi_instance *, flow 
*);
  **/
 size_t get_flow_files(nifi_instance * instance, flow * flow, flow_file_record 
** flowfiles, size_t size);
 
-flow_file_record *get(nifi_instance *,flow *, processor_session *);
-
 /**
  * Invoke a standalone processor without input data.
  * The processor is expected to generate flow file.
@@ -282,7 +280,7 @@ flow_file_record* create_ff_object_na(const char *file, 
const size_t len, const
  * @param context current processor context
  * @return a flow file record or nullptr in case there is none in the session
  **/
-flow_file_record* get_flowfile(processor_session* session, processor_context* 
context);
+flow_file_record* get(processor_session *session, processor_context *context);
 
 
 /**
@@ -299,7 +297,7 @@ void free_flowfile(flow_file_record* ff);
  * @size size size of the data pointed by "value"
  * @return 0 in case of success, -1 otherwise (already existed)
  **/
-uint8_t add_attribute(flow_file_record*, const char *key, void *value, size_t 
size);
+uint8_t add_attribute(flow_file_record* ff, const char *key, void *value, 
size_t size);
 
 /**
  * Updates an attribute (adds if it hasn't existed before)
diff --git a/nanofi/src/api/nanofi.cpp b/nanofi/src/api/nanofi.cpp
index aa957ac..a58f325 100644
--- a/nanofi/src/api/nanofi.cpp
+++ b/nanofi/src/api/nanofi.cpp
@@ -397,14 +397,14 @@ flow *create_flow(nifi_instance *instance, const char 
*first_processor) {
   return new_flow;
 }
 
-processor *add_python_processor(flow *flow, void 
(*ontrigger_callback)(processor_session *)) {
-  if (nullptr == flow || nullptr == ontrigger_callback) {
+processor *add_python_processor(flow *flow, processor_logic* logic) {
+  if (nullptr == flow || nullptr == logic) {
     return nullptr;
   }
-  auto lambda = [ontrigger_callback](core::ProcessSession *ps) {
-    ontrigger_callback(static_cast<processor_session*>(ps));  //Meh, sorry for 
this
+  auto lambda = [logic](core::ProcessSession *ps, core::ProcessContext *cx) {
+    logic(static_cast<processor_session*>(ps), 
static_cast<processor_context*>(cx));  //Meh, sorry for this
   };
-  auto proc = flow->addSimpleCallback(nullptr, lambda);
+  auto proc = flow->addCallback(nullptr, lambda);
   return static_cast<processor*>(proc.get());
 }
 
@@ -518,7 +518,7 @@ flow_file_record* 
flowfile_to_record(std::shared_ptr<core::FlowFile> ff, Executi
   return flowfile_to_record(ff, plan->getContentRepo());
 }
 
-flow_file_record* get_flowfile(processor_session* session, processor_context* 
context) {
+flow_file_record* get(processor_session *session, processor_context *context) {
   auto ff = session->get();
   if(!ff) {
     return nullptr;
@@ -554,14 +554,6 @@ size_t get_flow_files(nifi_instance *instance, flow *flow, 
flow_file_record **ff
   return i;
 }
 
-flow_file_record * get(nifi_instance * instance, flow * flow, 
processor_session * session) {
-  if (nullptr == instance || nullptr == flow || nullptr == session)
-    return nullptr;
-  auto ff = session->get();
-  flow->setNextFlowFile(ff);
-  return flowfile_to_record(ff, flow);
-}
-
 flow_file_record *invoke(standalone_processor* proc) {
   return invoke_ff(proc, nullptr);
 }
diff --git a/nanofi/tests/CAPITests.cpp b/nanofi/tests/CAPITests.cpp
index ca24ba8..35cbfc2 100644
--- a/nanofi/tests/CAPITests.cpp
+++ b/nanofi/tests/CAPITests.cpp
@@ -55,7 +55,7 @@ void big_failure_counter(flow_file_record * fr) {
 }
 
 void custom_processor_logic(processor_session * ps, processor_context * ctx) {
-  flow_file_record * ffr = get_flowfile(ps, ctx);
+  flow_file_record * ffr = get(ps, ctx);
   REQUIRE(ffr != nullptr);
   uint8_t * buffer = (uint8_t*)malloc(ffr->size* sizeof(uint8_t));
   get_content(ffr, buffer, ffr->size);
diff --git a/python/getFile.py b/python/getFile.py
index 3d2b9d0..fd3e335 100644
--- a/python/getFile.py
+++ b/python/getFile.py
@@ -23,16 +23,20 @@ from _cffi_backend import callback
 
 
 class GetFilePrinterProcessor(PyProcessor):
-    def __init__(self,instance, minifi, flow):
-        PyProcessor.__init__(self,instance,minifi,flow)
+    def __init__(self, minifi, flow):
+        PyProcessor.__init__(self, minifi, flow)
         self._callback = None
 
     def _onTriggerCallback(self):
-        def onTrigger(session):
-            flow_file = self.get(session)
+        def onTrigger(session, context):
+            flow_file = self.get(session, context)
             if flow_file:
-              flow_file.add_attribute("python_test","value")
-              self.transfer(session,flow_file, "success")
+                if flow_file.add_attribute("python_test","value"):
+                    print("Add attribute succeeded")
+                if not flow_file.add_attribute("python_test","value2"):
+                    print("Cannot add the same attribute twice!")
+                print ("original file name: " + 
flow_file.get_attribute("filename"))
+                self.transfer(session, flow_file, "success")
         return CALLBACK(onTrigger)
 
 
diff --git a/python/minifi/__init__.py b/python/minifi/__init__.py
index 7352eb7..b6de1d6 100644
--- a/python/minifi/__init__.py
+++ b/python/minifi/__init__.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 from ctypes import cdll
 import ctypes
-from abc import  abstractmethod
+from abc import abstractmethod
 
 
 
@@ -36,13 +36,22 @@ class CFlowFile(ctypes.Structure):
                  ('attributes', ctypes.c_void_p),
                  ('ffp', ctypes.c_void_p)]
 
+class CAttribute(ctypes.Structure):
+    _fields_ = [('key', ctypes.c_char_p),
+                ('value', ctypes.c_void_p),
+                ('value_size', ctypes.c_size_t)]
+
 class CProcessor(ctypes.Structure):
     _fields_ = [('processor_ptr', ctypes.c_void_p)]
 
 class CProcessSession(ctypes.Structure):
     _fields_ = [('process_session', ctypes.c_void_p)]
 
-CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession))
+class CProcessContext(ctypes.Structure):
+    _fields_ = [('process_context', ctypes.c_void_p)]
+
+
+CALLBACK = ctypes.CFUNCTYPE(None, ctypes.POINTER(CProcessSession), 
ctypes.POINTER(CProcessContext))
 
 class Processor(object):
     def __init__(self, cprocessor, minifi):
@@ -54,17 +63,16 @@ class Processor(object):
         self._minifi.set_property( self._proc, name.encode("UTF-8"), 
value.encode("UTF-8"))
 
 class PyProcessor(object):
-    def __init__(self, instance, minifi, flow):
+    def __init__(self, minifi, flow):
         super(PyProcessor, self).__init__()
-        self._instance = instance
         self._minifi = minifi
         self._flow = flow
 
     def setBase(self, proc):
         self._proc = proc
 
-    def get(self, session):
-        ff = self._minifi.get(self._instance.get_instance(),self._flow, 
session)
+    def get(self, session, context):
+        ff = self._minifi.get(session, context)
         if ff:
             return FlowFile(self._minifi, ff)
         else:
@@ -102,9 +110,22 @@ class FlowFile(object):
         self._minifi = minifi
         self._ff = ff
 
+    def get_attribute(self, name):
+        attr = CAttribute(name.encode("UTF-8"), 0, 0)
+        if self._minifi.get_attribute(self._ff, attr) != 0:
+            return ""
+        if attr.value_size > 0:
+            return ctypes.cast(attr.value, 
ctypes.c_char_p).value.decode("ascii")
+        return ""
+
     def add_attribute(self, name, value):
         vallen = len(value)
-        self._minifi.add_attribute(self._ff, name.encode("UTF-8"), 
value.encode("UTF-8"), vallen)
+        ret = self._minifi.add_attribute(self._ff, name.encode("UTF-8"), 
value.encode("UTF-8"), vallen)
+        return True if ret == 0 else False
+
+    def update_attribute(self, name, value):
+        vallen = len(value)
+        self._minifi.update_attribute(self._ff, name.encode("UTF-8"), 
value.encode("UTF-8"), vallen)
 
     def get_instance(self):
         return self._ff
@@ -138,7 +159,7 @@ class MiNiFi(object):
         self._minifi.transmit_flowfile.argtypes = [ctypes.POINTER(CFlowFile) , 
ctypes.POINTER(NIFI_STRUCT) ]
         self._minifi.transmit_flowfile.restype = ctypes.c_int
         """ get ff """
-        self._minifi.get.argtypes = [ctypes.POINTER(NIFI_STRUCT) , 
ctypes.POINTER(CFlow), ctypes.POINTER(CProcessSession) ]
+        self._minifi.get.argtypes = [ctypes.POINTER(CProcessSession), 
ctypes.POINTER(CProcessContext) ]
         self._minifi.get.restype = ctypes.POINTER(CFlowFile)
         """ add python processor """
         self._minifi.add_python_processor.argtypes = [ctypes.POINTER(CFlow) , 
ctypes.c_void_p ]
@@ -150,6 +171,14 @@ class MiNiFi(object):
         self._minifi.add_attribute.argtypes = [ctypes.POINTER(CFlowFile), 
ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ]
         self._minifi.add_attribute.restype = ctypes.c_int
 
+        """ update (overwrite) attribute to ff """
+        self._minifi.update_attribute.argtypes = [ctypes.POINTER(CFlowFile), 
ctypes.c_char_p, ctypes.c_char_p, ctypes.c_int ]
+        self._minifi.update_attribute.restype = None
+
+        """ get attribute of ff """
+        self._minifi.get_attribute.argtypes = [ctypes.POINTER(CFlowFile), 
ctypes.POINTER(CAttribute) ]
+        self._minifi.get_attribute.restype = ctypes.c_int
+
         self._minifi.init_api.argtype = ctypes.c_char_p
         self._minifi.init_api.restype = ctypes.c_int
         self._minifi.init_api(dll_file.encode("UTF-8"))
@@ -178,7 +207,7 @@ class MiNiFi(object):
         return Processor(proc,self._minifi)
 
     def create_python_processor(self, module, processor):
-        m =  getattr(module,processor)(self._instance,self._minifi,self._flow)
+        m =  getattr(module, processor)(self._minifi, self._flow)
         proc = self._minifi.add_python_processor(self._flow, 
m.getTriggerCallback())
         m.setBase(proc)
         return m

Reply via email to