MINIFI-6: Initial Checkin for basic C++ MiNiFi framework

Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/62394f72
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/62394f72
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/62394f72

Branch: refs/heads/MINIFI-6
Commit: 62394f727e48382c982e23b5b29277238de816ad
Parents: 83124ad
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Thu Apr 28 08:46:53 2016 -0700
Committer: Bin Qiu <benqiu2...@gmail.com>
Committed: Thu Apr 28 08:46:53 2016 -0700

----------------------------------------------------------------------
 Makefile                                    |   40 +
 README.md                                   |   34 +
 inc/Connection.h                            |  199 ++
 inc/Exception.h                             |   93 +
 inc/FlowFileRecord.h                        |  202 ++
 inc/Logger.h                                |  154 ++
 inc/ProcessContext.h                        |   99 +
 inc/ProcessSession.h                        |  112 +
 inc/Processor.h                             |  282 ++
 inc/Property.h                              |   78 +
 inc/Relationship.h                          |   86 +
 inc/ResourceClaim.h                         |   82 +
 inc/TimeUtil.h                              |   62 +
 inc/spdlog/async_logger.h                   |   90 +
 inc/spdlog/common.h                         |  116 +
 inc/spdlog/details/async_log_helper.h       |  326 +++
 inc/spdlog/details/async_logger_impl.h      |   82 +
 inc/spdlog/details/file_helper.h            |  144 +
 inc/spdlog/details/format.cc                | 1353 ++++++++++
 inc/spdlog/details/format.h                 | 3155 ++++++++++++++++++++++
 inc/spdlog/details/line_logger.h            |  221 ++
 inc/spdlog/details/log_msg.h                |   98 +
 inc/spdlog/details/logger_impl.h            |  320 +++
 inc/spdlog/details/mpmc_bounded_q.h         |  175 ++
 inc/spdlog/details/null_mutex.h             |   43 +
 inc/spdlog/details/os.h                     |  198 ++
 inc/spdlog/details/pattern_formatter_impl.h |  628 +++++
 inc/spdlog/details/registry.h               |  180 ++
 inc/spdlog/details/spdlog_impl.h            |  154 ++
 inc/spdlog/formatter.h                      |   58 +
 inc/spdlog/logger.h                         |  132 +
 inc/spdlog/sinks/base_sink.h                |   66 +
 inc/spdlog/sinks/file_sinks.h               |  232 ++
 inc/spdlog/sinks/null_sink.h                |   52 +
 inc/spdlog/sinks/ostream_sink.h             |   67 +
 inc/spdlog/sinks/sink.h                     |   42 +
 inc/spdlog/sinks/stdout_sinks.h             |   71 +
 inc/spdlog/sinks/syslog_sink.h              |  102 +
 inc/spdlog/spdlog.h                         |  155 ++
 inc/spdlog/tweakme.h                        |   74 +
 main/MiNiFiMain.cpp                         |   33 +
 src/Connection.cpp                          |  146 +
 src/FlowFileRecord.cpp                      |  210 ++
 src/Logger.cpp                              |   27 +
 src/ProcessSession.cpp                      |  526 ++++
 src/Processor.cpp                           |  363 +++
 src/ResourceClaim.cpp                       |   41 +
 test/FlowFileRecordTest.cpp                 |   28 +
 48 files changed, 11231 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/Makefile
----------------------------------------------------------------------
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..7529a33
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,40 @@
+CC=g++
+AR=ar
+TARGET_DIR= ./build
+TARGET_LIB=libminifi.a
+TARGET_EXE=minifi
+CFLAGS=-O0 -fexceptions -fpermissive -Wno-write-strings -std=c++11 -fPIC -Wall 
-g -Wno-unused-private-field
+INCLUDES=-I./inc -I./src -I./test -I/usr/include/libxml2 
-I/usr/local/opt/leveldb/include/
+LDDIRECTORY=-L/usr/local/opt/leveldb/out-static/ -L./build
+LDFLAGS=-lminifi -lxml2 -lleveldb -pthread -luuid
+
+UNAME_S := $(shell uname -s)
+ifeq ($(UNAME_S),Linux)
+       LDFLAGS += -lrt
+endif
+ifeq ($(UNAME_S),Darwin)
+endif
+
+OBJS:=$(shell /bin/ls src/*.cpp | xargs -n1 basename 2>/dev/null |  awk 
'/\.cpp$$/{a=$$0; gsub("\\.cpp$$",".o", a); print "$(TARGET_DIR)/" a}')
+TESTS:=FlowFileRecordTest
+
+all: directory $(TARGET_DIR)/$(TARGET_LIB) minifi tests
+
+directory:
+       mkdir -p $(TARGET_DIR)
+
+$(TARGET_DIR)/%.o: src/%.cpp
+       $(CC) $(CFLAGS) $(INCLUDES) -o $@ -c $<
+
+$(TARGET_DIR)/$(TARGET_LIB): $(OBJS)
+       $(AR) crs $@ $(OBJS)
+
+minifi: $(TARGET_DIR)/$(TARGET_LIB)
+       $(CC) $(CFLAGS) $(INCLUDES) -o $(TARGET_DIR)/$(TARGET_EXE) 
main/MiNiFiMain.cpp $(LDDIRECTORY) $(LDFLAGS)
+
+tests: $(TARGET_DIR)/$(TARGET_LIB)
+       $(foreach TEST_NAME, $(TESTS),\
+       $(CC) $(CFLAGS) $(INCLUDES) -o $(TARGET_DIR)/$(TEST_NAME) 
test/$(TEST_NAME).cpp $(LDDIRECTORY) $(LDFLAGS);)
+
+clean:
+       rm -rf $(TARGET_DIR)/*

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index f050b0f..0601e6e 100644
--- a/README.md
+++ b/README.md
@@ -36,3 +36,37 @@ distributed under the License is distributed on an "AS IS" 
BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
+## Dependencies
+   * [LevelDB](https://github.com/google/leveldb) - tested with v1.18
+     MAC: brew install leveldb
+   * gcc - 4.8.4
+   * g++ - 4.8.4
+   * [libxml2](http://xmlsoft.org/) - tested with 2.9.1
+     MAC: brew install libxml2
+   * [libuuid] https://sourceforge.net/projects/libuuid/
+     MAC: After download the above source, configure/make/make install
+
+## Build instructions
+
+Build application
+ 
+   $ make
+
+Build tests
+   
+   $ make tests
+
+Clean 
+   
+   $ make clean
+
+
+## Running 
+
+Running application
+
+   $ ./build/minifi
+
+Runnning tests 
+
+   $ ./build/FlowFileRecordTest 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/Connection.h
----------------------------------------------------------------------
diff --git a/inc/Connection.h b/inc/Connection.h
new file mode 100644
index 0000000..b09b51b
--- /dev/null
+++ b/inc/Connection.h
@@ -0,0 +1,199 @@
+/**
+ * @file Connection.h
+ * Connection class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __CONNECTION_H__
+#define __CONNECTION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+
+#include "FlowFileRecord.h"
+#include "Relationship.h"
+#include "Logger.h"
+
+//! Forwarder declaration
+class Processor;
+
+//! Connection Class
+class Connection
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID = NULL, 
uuid_t destUUID = NULL);
+       //! Destructor
+       virtual ~Connection();
+       //! Set Connection Name
+       void setName(std::string name) {
+               _name = name;
+       }
+       //! Get Process Name
+       std::string getName(void) {
+               return (_name);
+       }
+       //! Set UUID
+       void setUUID(uuid_t uuid) {
+               uuid_copy(_uuid, uuid);
+       }
+       //! Set Source Processor UUID
+       void setSourceProcessorUUID(uuid_t uuid) {
+               uuid_copy(_srcUUID, uuid);
+       }
+       //! Set Destination Processor UUID
+       void setDestinationProcessorUUID(uuid_t uuid) {
+               uuid_copy(_destUUID, uuid);
+       }
+       //! Get Source Processor UUID
+       void getSourceProcessorUUID(uuid_t uuid) {
+               uuid_copy(uuid, _srcUUID);
+       }
+       //! Get Destination Processor UUID
+       void getDestinationProcessorUUID(uuid_t uuid) {
+               uuid_copy(uuid, _destUUID);
+       }
+       //! Get UUID
+       bool getUUID(uuid_t uuid) {
+               if (uuid)
+               {
+                       uuid_copy(uuid, _uuid);
+                       return true;
+               }
+               else
+                       return false;
+       }
+       //! Set Connection Source Processor
+       void setSourceProcessor(Processor *source) {
+               _srcProcessor = source;
+       }
+       // ! Get Connection Source Processor
+       Processor *getSourceProcessor() {
+               return _srcProcessor;
+       }
+       //! Set Connection Destination Processor
+       void setDestinationProcessor(Processor *dest) {
+               _destProcessor = dest;
+       }
+       // ! Get Connection Destination Processor
+       Processor *getDestinationProcessor() {
+               return _destProcessor;
+       }
+       //! Set Connection relationship
+       void setRelationship(Relationship relationship) {
+               _relationship = relationship;
+       }
+       // ! Get Connection relationship
+       Relationship getRelationship() {
+               return _relationship;
+       }
+       //! Set Max Queue Size
+       void setMaxQueueSize(uint64_t size)
+       {
+               _maxQueueSize = size;
+       }
+       //! Get Max Queue Size
+       uint64_t getMaxQueueSize()
+       {
+               return _maxQueueSize;
+       }
+       //! Set Max Queue Data Size
+       void setMaxQueueDataSize(uint64_t size)
+       {
+               _maxQueueDataSize = size;
+       }
+       //! Get Max Queue Data Size
+       uint64_t getMaxQueueDataSize()
+       {
+               return _maxQueueDataSize;
+       }
+       //! Set Flow expiration duration in millisecond
+       void setFlowExpirationDuration(uint64_t duration)
+       {
+               _expiredDuration = duration;
+       }
+       //! Get Flow expiration duration in millisecond
+       uint64_t getFlowExpirationDuration()
+       {
+               return _expiredDuration;
+       }
+       //! Check whether the queue is empty
+       bool isEmpty();
+       //! Check whether the queue is full to apply back pressure
+       bool isFull();
+       //! Get queue size
+       uint64_t getQueueSize() {
+               std::lock_guard<std::mutex> lock(_mtx);
+               return _queue.size();
+       }
+       //! Get queue data size
+       uint64_t getQueueDataSize()
+       {
+               return _maxQueueDataSize;
+       }
+       //! Put the flow file into queue
+       void put(FlowFileRecord *flow);
+       //! Poll the flow file from queue, the expired flow file record also 
being returned
+       FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords);
+
+protected:
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! Source Processor UUID
+       uuid_t _srcUUID;
+       //! Destination Processor UUID
+       uuid_t _destUUID;
+       //! Connection Name
+       std::string _name;
+       //! Relationship for this connection
+       Relationship _relationship;
+       //! Source Processor (ProcessNode/Port)
+       Processor *_srcProcessor;
+       //! Destination Processor (ProcessNode/Port)
+       Processor *_destProcessor;
+       //! Max queue size to apply back pressure
+       std::atomic<uint64_t> _maxQueueSize;
+       //! Max queue data size to apply back pressure
+       std::atomic<uint64_t> _maxQueueDataSize;
+       //! Flow File Expiration Duration in= MilliSeconds
+       std::atomic<uint64_t> _expiredDuration;
+
+
+private:
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Queued data size
+       std::atomic<uint64_t> _queuedDataSize;
+       //! Queue for the Flow File
+       std::queue<FlowFileRecord *> _queue;
+       //! Logger
+       Logger *_logger;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Connection(const Connection &parent);
+       Connection &operator=(const Connection &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/Exception.h
----------------------------------------------------------------------
diff --git a/inc/Exception.h b/inc/Exception.h
new file mode 100644
index 0000000..8364100
--- /dev/null
+++ b/inc/Exception.h
@@ -0,0 +1,93 @@
+/**
+ * @file Exception.h
+ * Exception class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EXCEPTION_H__
+#define __EXCEPTION_H__
+
+#include <sstream>
+#include <exception>
+#include <stdexcept>
+#include <errno.h>
+#include <string.h>
+
+//! ExceptionType 
+enum ExceptionType 
+{
+       FILE_OPERATION_EXCEPTION = 0,
+       FLOW_EXCEPTION,
+       PROCESSOR_EXCEPTION,
+       PROCESS_SESSION_EXCEPTION,
+       PROCESS_SCHEDULE_EXCEPTION,
+       GENERAL_EXCEPTION,
+       MAX_EXCEPTION
+};
+
+//! Exception String 
+static const char *ExceptionStr[MAX_EXCEPTION] =
+{
+               "File Operation",
+               "Flow File Operation",
+               "Processor Operation",
+               "Process Session Operation",
+               "Process Schedule Operation",
+               "General Operation"
+};
+
+//! Exception Type to String 
+inline const char *ExceptionTypeToString(ExceptionType type)
+{
+       if (type < MAX_EXCEPTION)
+               return ExceptionStr[type];
+       else
+               return NULL;
+}
+
+//! Exception Class
+class Exception : public std::exception
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new flow record
+        */
+       Exception(ExceptionType type, const char *errorMsg) : _type(type), 
_errorMsg(errorMsg) {
+       }
+       //! Destructor
+       virtual ~Exception() throw ();
+       virtual const char * what() const throw () {
+
+               _whatStr = ExceptionTypeToString(_type);
+
+               _whatStr += ":" + _errorMsg;
+               return _whatStr.c_str();
+       }
+
+protected:
+
+private:
+       //! Exception type
+       ExceptionType _type;
+       //! Exception detailed information
+       std::string _errorMsg;
+       //! Hold the what result
+       mutable std::string _whatStr;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/FlowFileRecord.h
----------------------------------------------------------------------
diff --git a/inc/FlowFileRecord.h b/inc/FlowFileRecord.h
new file mode 100644
index 0000000..bceee9f
--- /dev/null
+++ b/inc/FlowFileRecord.h
@@ -0,0 +1,202 @@
+/**
+ * @file FlowFileRecord.h
+ * Flow file record class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __FLOW_FILE_RECORD_H__
+#define __FLOW_FILE_RECORD_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <iostream>
+#include <sstream>
+#include <fstream>
+#include <set>
+
+#include "TimeUtil.h"
+#include "Logger.h"
+#include "ResourceClaim.h"
+
+class ProcessSession;
+class Connection;
+
+#define DEFAULT_FLOWFILE_PATH "."
+
+//! FlowFile Attribute
+enum FlowAttribute
+{
+       //! The flowfile's path indicates the relative directory to which a 
FlowFile belongs and does not contain the filename
+       PATH = 0,
+       //! The flowfile's absolute path indicates the absolute directory to 
which a FlowFile belongs and does not contain the filename
+       ABSOLUTE_PATH,
+       //! The filename of the FlowFile. The filename should not contain any 
directory structure.
+       FILENAME,
+       //! A unique UUID assigned to this FlowFile.
+       UUID,
+       //! A numeric value indicating the FlowFile priority
+       priority,
+       //! The MIME Type of this FlowFile
+       MIME_TYPE,
+       //! Specifies the reason that a FlowFile is being discarded
+       DISCARD_REASON,
+       //! Indicates an identifier other than the FlowFile's UUID that is 
known to refer to this FlowFile.
+       ALTERNATE_IDENTIFIER,
+       MAX_FLOW_ATTRIBUTES
+};
+
+//! FlowFile Attribute Key
+static const char *FlowAttributeKeyArray[MAX_FLOW_ATTRIBUTES] =
+{
+               "path",
+               "absolute.path",
+               "filename",
+               "uuid",
+               "priority",
+               "mime.type",
+               "discard.reason",
+               "alternate.identifier"
+};
+
+//! FlowFile Attribute Enum to Key
+inline const char *FlowAttributeKey(FlowAttribute attribute)
+{
+       if (attribute < MAX_FLOW_ATTRIBUTES)
+               return FlowAttributeKeyArray[attribute];
+       else
+               return NULL;
+}
+
+//! FlowFile IO Callback functions for input and output
+//! throw exception for error
+typedef void (*InputStreamCallback) (std::ifstream *stream);
+typedef void (*OutputStreamCallback) (std::ofstream *stream);
+
+
+//! FlowFile Record Class
+class FlowFileRecord
+{
+       friend class ProcessSession;
+public:
+       //! Constructor
+       /*!
+        * Create a new flow record
+        */
+       FlowFileRecord(std::map<std::string, std::string> attributes, 
ResourceClaim *claim = NULL);
+       //! Destructor
+       virtual ~FlowFileRecord();
+       //! addAttribute key is enum
+       bool addAttribute(FlowAttribute key, std::string value);
+       //! addAttribute key is string
+       bool addAttribute(std::string key, std::string value);
+       //! removeAttribute key is enum
+       bool removeAttribute(FlowAttribute key);
+       //! removeAttribute key is string
+       bool removeAttribute(std::string key);
+       //! updateAttribute key is enum
+       bool updateAttribute(FlowAttribute key, std::string value);
+       //! updateAttribute key is string
+       bool updateAttribute(std::string key, std::string value);
+       //! getAttribute key is enum
+       bool getAttribute(FlowAttribute key, std::string &value);
+       //! getAttribute key is string
+       bool getAttribute(std::string key, std::string &value);
+       //! duplicate the original flow file
+       void duplicate(FlowFileRecord *original);
+       //! setAttribute, if attribute already there, update it, else, add it
+       void setAttribute(std::string key, std::string value) {
+               _attributes[key] = value;
+       }
+       //! Get the UUID as string
+       std::string getUUIDStr() {
+               return _uuidStr;
+       }
+       //! Get Attributes
+       std::map<std::string, std::string> getAttributes() {
+               return _attributes;
+       }
+       //! Check whether it is still being penalized
+       bool isPenalized() {
+               return (_penaltyExpirationMs > 0 ? _penaltyExpirationMs > 
getTimeMillis() : false);
+       }
+       //! Get Size
+       uint64_t getSize() {
+               return _size;
+       }
+       // ! Get Offset
+       uint64_t getOffset() {
+               return _offset;
+       }
+       // ! Get Entry Date
+       uint64_t getEntryDate() {
+               return _entryDate;
+       }
+       // ! Set Original connection
+       void setOriginalConnection (Connection *connection) {
+               _orginalConnection = connection;
+       }
+
+protected:
+
+       //! Date at which the flow file entered the flow
+       uint64_t _entryDate;
+       //! Date at which the origin of this flow file entered the flow
+       uint64_t _lineageStartDate;
+       //! Date at which the flow file was queued
+       uint64_t _lastQueueDate;
+       //! Size in bytes of the data corresponding to this flow file
+       uint64_t _size;
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! A local unique identifier
+       uint64_t _id;
+       //! Offset to the content
+       uint64_t _offset;
+       //! Penalty expiration
+       uint64_t _penaltyExpirationMs;
+       //! Attributes key/values pairs for the flow record
+       std::map<std::string, std::string> _attributes;
+       //! Pointer to the associated content resource claim
+       ResourceClaim *_claim;
+       //! UUID string
+       std::string _uuidStr;
+       //! UUID string for all parents
+       std::set<std::string> _lineageIdentifiers;
+
+private:
+
+       //! Local flow sequence ID
+       static std::atomic<uint64_t> _localFlowSeqNumber;
+       //! Mark for deletion
+       bool _markedDelete;
+       //! Connection queue that this flow file will be transfer or current in
+       Connection *_connection;
+       //! Orginal connection queue that this flow file was dequeued from
+       Connection *_orginalConnection;
+       //! Logger
+       Logger *_logger;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       FlowFileRecord(const FlowFileRecord &parent);
+       FlowFileRecord &operator=(const FlowFileRecord &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/Logger.h
----------------------------------------------------------------------
diff --git a/inc/Logger.h b/inc/Logger.h
new file mode 100644
index 0000000..ac43358
--- /dev/null
+++ b/inc/Logger.h
@@ -0,0 +1,154 @@
+/**
+ * @file Logger.h
+ * Logger class declaration
+ * This is a C++ wrapper for spdlog, a lightweight C++ logging library
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOGGER_H__
+#define __LOGGER_H__
+
+#include "spdlog/spdlog.h"
+
+using spdlog::stdout_logger_mt;
+using spdlog::rotating_logger_mt;
+using spdlog::logger;
+
+#define LOG_BUFFER_SIZE 1024
+#define FILL_BUFFER  char buffer[LOG_BUFFER_SIZE]; \
+    va_list args; \
+    va_start(args, format); \
+    vsnprintf(buffer, LOG_BUFFER_SIZE,format, args); \
+    va_end(args);
+
+//! 5M default log file size
+#define DEFAULT_LOG_FILE_SIZE (5*1024*1024)
+//! 3 log files rotation
+#define DEFAULT_LOG_FILE_NUMBER 3
+#define LOG_NAME "nifi"
+#define LOG_FILE_NAME "nifi"
+
+typedef enum
+{
+    trace    = 0,
+    debug    = 1,
+    info     = 2,
+    notice   = 3,
+    warn     = 4,
+    err      = 5,
+    critical = 6,
+    alert    = 7,
+    emerg    = 8,
+    off      = 9
+} LOG_LEVEL_E;
+
+//! Logger Class
+class Logger {
+
+public:
+
+       //! Get the singleton logger instance
+       static Logger * getLogger() {
+               if (!_logger)
+                       _logger = new Logger();
+               return _logger;
+       }
+       void setLogLevel(LOG_LEVEL_E level) {
+               if (_spdlog == NULL)
+                       return;
+               _spdlog->set_level((spdlog::level::level_enum) level);
+       }
+       //! Destructor
+       ~Logger();
+       /**
+        * @brief Log error message
+        * @param format format string ('man printf' for syntax)
+        * @warning does not check @p log or @p format for null. Caller must 
ensure parameters and format string lengths match
+        */
+       void log_error(const char *const format, ...) {
+               if(_spdlog == NULL)
+                       return;
+               FILL_BUFFER
+           _spdlog->error(buffer);
+       }
+       /**
+        * @brief Log warn message
+        * @param format format string ('man printf' for syntax)
+        * @warning does not check @p log or @p format for null. Caller must 
ensure parameters and format string lengths match
+        */
+       void log_warn(const char *const format, ...) {
+               if(_spdlog == NULL)
+                       return;
+               FILL_BUFFER
+           _spdlog->warn(buffer);
+       }
+       /**
+        * @brief Log info message
+        * @param format format string ('man printf' for syntax)
+        * @warning does not check @p log or @p format for null. Caller must 
ensure parameters and format string lengths match
+        */
+       void log_info(const char *const format, ...) {
+               if(_spdlog == NULL)
+                       return;
+               FILL_BUFFER
+           _spdlog->info(buffer);
+       }
+       /**
+        * @brief Log debug message
+        * @param format format string ('man printf' for syntax)
+        * @warning does not check @p log or @p format for null. Caller must 
ensure parameters and format string lengths match
+        */
+       void log_debug(const char *const format, ...) {
+               if(_spdlog == NULL)
+                       return;
+               FILL_BUFFER
+           _spdlog->debug(buffer);
+       }
+       /**
+        * @brief Log trace message
+        * @param format format string ('man printf' for syntax)
+        * @warning does not check @p log or @p format for null. Caller must 
ensure parameters and format string lengths match
+        */
+       void log_trace(const char *const format, ...) {
+               if(_spdlog == NULL)
+                       return;
+               FILL_BUFFER
+           _spdlog->trace(buffer);
+       }
+
+protected:
+
+private:
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Logger(const Logger &parent);
+       Logger &operator=(const Logger &parent);
+       //! Constructor
+       /*!
+        * Create a logger
+        * */
+       Logger(const std::string logger_name = LOG_NAME, const std::string 
filename = LOG_FILE_NAME, size_t max_file_size = DEFAULT_LOG_FILE_SIZE, size_t 
max_files = DEFAULT_LOG_FILE_NUMBER, bool force_flush = false) {
+               _spdlog = rotating_logger_mt(logger_name, filename, 
max_file_size, max_files, force_flush);
+               _spdlog->set_level((spdlog::level::level_enum) debug);
+       }
+       //! spdlog
+       std::shared_ptr<logger> _spdlog;
+
+       //! Singleton logger instance
+       static Logger *_logger;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/ProcessContext.h
----------------------------------------------------------------------
diff --git a/inc/ProcessContext.h b/inc/ProcessContext.h
new file mode 100644
index 0000000..927336d
--- /dev/null
+++ b/inc/ProcessContext.h
@@ -0,0 +1,99 @@
+/**
+ * @file ProcessContext.h
+ * ProcessContext class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_CONTEXT_H__
+#define __PROCESS_CONTEXT_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+
+#include "Logger.h"
+#include "Processor.h"
+
+//! ProcessContext Class
+class ProcessContext
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new process context associated with the 
processor/controller service/state manager
+        */
+       ProcessContext(Processor *processor = NULL) : _processor(processor) {
+               _logger = Logger::getLogger();
+       }
+       //! Destructor
+       virtual ~ProcessContext();
+       //! Get Processor associated with the Process Context
+       Processor *getProcessor() {
+               return _processor;
+       }
+       bool getProperty(std::string name, std::string &value) {
+               if (_processor)
+                       return _processor->getProperty(name, value);
+               else
+                       return false;
+       }
+       //! Whether the relationship is supported
+       bool isSupportedRelationship(Relationship relationship) {
+               if (_processor)
+                       return 
_processor->isSupportedRelationship(relationship);
+               else
+                       return false;
+       }
+       //! Check whether the relationship is auto terminated
+       bool isAutoTerminated(Relationship relationship) {
+               if (_processor)
+                       return _processor->isAutoTerminated(relationship);
+               else
+                       return false;
+       }
+       //! Get ProcessContext Maximum Concurrent Tasks
+       uint8_t getMaxConcurrentTasks(void) {
+               if (_processor)
+                       return _processor->getMaxConcurrentTasks();
+               else
+                       return 0;
+       }
+       //! Yield based on the yield period
+       void yield() {
+               if (_processor)
+                       _processor->yield();
+       }
+
+protected:
+
+private:
+
+       //! Processor
+       Processor *_processor;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ProcessContext(const ProcessContext &parent);
+       ProcessContext &operator=(const ProcessContext &parent);
+       //! Logger
+       Logger *_logger;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/ProcessSession.h
----------------------------------------------------------------------
diff --git a/inc/ProcessSession.h b/inc/ProcessSession.h
new file mode 100644
index 0000000..2ee1cf0
--- /dev/null
+++ b/inc/ProcessSession.h
@@ -0,0 +1,112 @@
+/**
+ * @file ProcessSession.h
+ * ProcessSession class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESS_SESSION_H__
+#define __PROCESS_SESSION_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "Logger.h"
+#include "Processor.h"
+#include "ProcessContext.h"
+#include "FlowFileRecord.h"
+#include "Exception.h"
+
+//! ProcessSession Class
+class ProcessSession
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new process session
+        */
+       ProcessSession(ProcessContext *processContext = NULL) : 
_processContext(processContext) {
+               _logger = Logger::getLogger();
+               _logger->log_trace("ProcessSession created for %s", 
_processContext->getProcessor()->getName().c_str());
+       }
+       //! Destructor
+       virtual ~ProcessSession();
+       //! Commit the session
+       void commit();
+       //! Roll Back the session
+       void rollback();
+       //!
+       //! Get the FlowFile from the highest priority queue
+       FlowFileRecord *get();
+       //! Create a new UUID FlowFile with no content resource claim and 
without parent
+       FlowFileRecord *create();
+       //! Create a new UUID FlowFile with no content resource claim and 
inherit all attributes from parent
+       FlowFileRecord *create(FlowFileRecord *parent);
+       //! Clone a new UUID FlowFile from parent both for content resource 
claim and attributes
+       FlowFileRecord *clone(FlowFileRecord *parent);
+       //! Clone a new UUID FlowFile from parent for attributes and sub set of 
parent content resource claim
+       FlowFileRecord *clone(FlowFileRecord *parent, long offset, long size);
+       //! Duplicate a FlowFile with the same UUID and all attributes and 
content resource claim for the roll back of the session
+       FlowFileRecord *duplicate(FlowFileRecord *orignal);
+       //! Transfer the FlowFile to the relationship
+       void transfer(FlowFileRecord *flow, Relationship relationship);
+       //! Put Attribute
+       void putAttribute(FlowFileRecord *flow, std::string key, std::string 
value);
+       //! Remove Attribute
+       void removeAttribute(FlowFileRecord *flow, std::string key);
+       //! Remove Flow File
+       void remove(FlowFileRecord *flow);
+       //! Execute the given read callback against the content
+       void read(FlowFileRecord *flow, InputStreamCallback callback);
+       //! Execute the given write callback against the content
+       void write(FlowFileRecord *flow, OutputStreamCallback callback);
+       //! Execute the given write/append callback against the content
+       void append(FlowFileRecord *flow, OutputStreamCallback callback);
+       //! Penalize the flow
+       void penalize(FlowFileRecord *flow);
+
+protected:
+       //! FlowFiles being modified by current process session
+       std::map<std::string, FlowFileRecord *> _updatedFlowFiles;
+       //! Copy of the original FlowFiles being modified by current process 
session as above
+       std::map<std::string, FlowFileRecord *> _originalFlowFiles;
+       //! FlowFiles being added by current process session
+       std::map<std::string, FlowFileRecord *> _addedFlowFiles;
+       //! FlowFiles being deleted by current process session
+       std::map<std::string, FlowFileRecord *> _deletedFlowFiles;
+       //! FlowFiles being transfered to the relationship
+       std::map<std::string, Relationship> _transferRelationship;
+       //! FlowFiles being cloned for multiple connections per relationship
+       std::map<std::string, FlowFileRecord *> _clonedFlowFiles;
+
+private:
+       //! ProcessContext
+       ProcessContext *_processContext;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ProcessSession(const ProcessSession &parent);
+       ProcessSession &operator=(const ProcessSession &parent);
+       //! Logger
+       Logger *_logger;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/Processor.h
----------------------------------------------------------------------
diff --git a/inc/Processor.h b/inc/Processor.h
new file mode 100644
index 0000000..5848e74
--- /dev/null
+++ b/inc/Processor.h
@@ -0,0 +1,282 @@
+/**
+ * @file Processor.h
+ * Processor class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROCESSOR_H__
+#define __PROCESSOR_H__
+
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <algorithm>
+#include <set>
+
+#include "TimeUtil.h"
+#include "Property.h"
+#include "Relationship.h"
+#include "Connection.h"
+
+//! Forwarder declaration
+class ProcessContext;
+class ProcessSession;
+
+//! Minimum scheduling period in Nano Second
+#define MINIMUM_SCHEDULING_NANOS 30000
+
+//! Default yield period in second
+#define DEFAULT_YIELD_PERIOD_SECONDS 1
+
+//! Default penalization period in second
+#define DEFAULT_PENALIZATION_PERIOD_SECONDS 30
+
+/*!
+ * Indicates the valid values for the state of a entity
+ * with respect to scheduling the entity to run.
+ */
+enum ScheduledState {
+
+    /**
+     * Entity cannot be scheduled to run
+     */
+    DISABLED,
+    /**
+     * Entity can be scheduled to run but currently is not
+     */
+    STOPPED,
+    /**
+     * Entity is currently scheduled to run
+     */
+    RUNNING
+};
+
+/*!
+ * Scheduling Strategy
+ */
+enum SchedulingStrategy {
+       //! Event driven
+       EVENT_DRIVEN,
+       //! Timer driven
+       TIMER_DRIVEN,
+       //! Cron Driven
+       CRON_DRIVEN
+};
+
+//! Processor Class
+class Processor
+{
+       friend class ProcessContext;
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       Processor(std::string name, uuid_t uuid = NULL);
+       //! Destructor
+       virtual ~Processor();
+       //! Set Processor Name
+       void setName(std::string name) {
+               _name = name;
+       }
+       //! Get Process Name
+       std::string getName(void) {
+               return (_name);
+       }
+       //! Set UUID
+       void setUUID(uuid_t uuid) {
+               uuid_copy(_uuid, uuid);
+       }
+       //! Get UUID
+       bool getUUID(uuid_t uuid) {
+               if (uuid)
+               {
+                       uuid_copy(uuid, _uuid);
+                       return true;
+               }
+               else
+                       return false;
+       }
+       //! Set the supported processor properties while the process is not 
running
+       bool setSupportedProperties(std::set<Property> properties);
+       //! Set the supported relationships while the process is not running
+       bool setSupportedRelationships(std::set<Relationship> relationships);
+       //! Get the supported property value by name
+       bool getProperty(std::string name, std::string &value);
+       //! Set the supported property value by name wile the process is not 
running
+       bool setProperty(std::string name, std::string value);
+       //! Whether the relationship is supported
+       bool isSupportedRelationship(Relationship relationship);
+       //! Set the auto terminated relationships while the process is not 
running
+       bool setAutoTerminatedRelationships(std::set<Relationship> 
relationships);
+       //! Check whether the relationship is auto terminated
+       bool isAutoTerminated(Relationship relationship);
+       //! Check whether the processor is running
+       bool isRunning();
+       //! Set Processor Scheduled State
+       void setScheduledState(ScheduledState state) {
+               _state = state;
+       }
+       //! Get Processor Scheduled State
+       ScheduledState getScheduledState(void) {
+               return _state;
+       }
+       //! Set Processor Scheduling Strategy
+       void setSchedulingStrategy(SchedulingStrategy strategy) {
+               _strategy = strategy;
+       }
+       //! Get Processor Scheduling Strategy
+       SchedulingStrategy getSchedulingStrategy(void) {
+               return _strategy;
+       }
+       //! Set Processor Loss Tolerant
+       void setlossTolerant(bool lossTolerant) {
+               _lossTolerant = lossTolerant;
+       }
+       //! Get Processor Loss Tolerant
+       bool getlossTolerant(void) {
+               return _lossTolerant;
+       }
+       //! Set Processor Scheduling Period in Nano Second
+       void setSchedulingPeriodNano(uint64_t period) {
+               uint64_t minPeriod = MINIMUM_SCHEDULING_NANOS;
+               _schedulingPeriodNano = std::max(period, minPeriod);
+       }
+       //! Get Processor Scheduling Period in Nano Second
+       uint64_t getSchedulingPeriodNano(void) {
+               return _schedulingPeriodNano;
+       }
+       //! Set Processor Run Duration in Nano Second
+       void setRunDurationNano(uint64_t period) {
+               _runDurantionNano = period;
+       }
+       //! Get Processor Run Duration in Nano Second
+       uint64_t getRunDurationNano(void) {
+               return(_runDurantionNano);
+       }
+       //! Set Processor yield period in MilliSecond
+       void setYieldPeriodMsec(uint64_t period) {
+               _yieldPeriodMsec = period;
+       }
+       //! Get Processor yield period in MilliSecond
+       uint64_t getYieldPeriodMsec(void) {
+               return(_yieldPeriodMsec);
+       }
+       //! Set Processor penalization period in MilliSecond
+       void setPenalizationPeriodMsec(uint64_t period) {
+               _penalizationPeriodMsec = period;
+       }
+       //! Get Processor penalization period in MilliSecond
+       uint64_t getPenalizationPeriodMsec(void) {
+               return(_penalizationPeriodMsec);
+       }
+       //! Set Processor Maximum Concurrent Tasks
+       void setMaxConcurrentTasks(uint8_t tasks) {
+               _maxConcurrentTasks = tasks;
+       }
+       //! Get Processor Maximum Concurrent Tasks
+       uint8_t getMaxConcurrentTasks(void) {
+               return(_maxConcurrentTasks);
+       }
+       //! Set Trigger when empty
+       void setTriggerWhenEmpty(bool value) {
+               _triggerWhenEmpty = value;
+       }
+       //! Get Trigger when empty
+       bool getTriggerWhenEmpty(void) {
+               return(_triggerWhenEmpty);
+       }
+       //! Get Active Task Counts
+       uint8_t getActiveTasks(void) {
+               return(_activeTasks);
+       }
+       //! Yield based on the yield period
+       void yield();
+       //! Get incoming connections
+       std::set<Connection *> getIncomingConnections() {
+               return _incomingConnections;
+       }
+       //! Get outgoing connections based on relationship name
+       std::set<Connection *> getOutGoingConnections(std::string relationship);
+       //! Add connection
+       bool addConnection(Connection *connection);
+       //! Remove connection
+       void removeConnection(Connection *connection);
+
+public:
+       //! OnTrigger method, implemented by NiFi Processor Designer
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session) = 0;
+       //! Initialize, over write by NiFi Process Designer
+       virtual void initialize(void) {
+               return;
+       }
+
+protected:
+
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! Processor Name
+       std::string _name;
+       //! Supported properties
+       std::map<std::string, Property> _properties;
+       //! Supported relationships
+       std::map<std::string, Relationship> _relationships;
+       //! Autoterminated relationships
+       std::map<std::string, Relationship> _autoTerminatedRelationships;
+       //! Processor state
+       std::atomic<ScheduledState> _state;
+       //! Scheduling Strategy
+       std::atomic<SchedulingStrategy> _strategy;
+       //! lossTolerant
+       std::atomic<bool> _lossTolerant;
+       //! SchedulePeriod in Nano Seconds
+       std::atomic<uint64_t> _schedulingPeriodNano;
+       //! Run Duration in Nano Seconds
+       std::atomic<uint64_t> _runDurantionNano;
+       //! Yield Period in Milliseconds
+       std::atomic<uint64_t> _yieldPeriodMsec;
+       //! Penalization Period in MilliSecond
+       std::atomic<uint64_t> _penalizationPeriodMsec;
+       //! Maximum Concurrent Tasks
+       std::atomic<uint8_t> _maxConcurrentTasks;
+       //! Active Tasks
+       std::atomic<uint8_t> _activeTasks;
+       //! Trigger the Processor even if the incoming connection is empty
+       std::atomic<bool> _triggerWhenEmpty;
+       //! Incoming connections
+       std::set<Connection *> _incomingConnections;
+       //! Outgoing connections map based on Relationship name
+       std::map<std::string, std::set<Connection *>> _outGoingConnections;
+
+private:
+
+       //! Mutex for protection
+       std::mutex _mtx;
+       //! Yield Expiration
+       std::atomic<uint64_t> _yieldExpiration;
+       //! Logger
+       Logger *_logger;
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       Processor(const Processor &parent);
+       Processor &operator=(const Processor &parent);
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/Property.h
----------------------------------------------------------------------
diff --git a/inc/Property.h b/inc/Property.h
new file mode 100644
index 0000000..e5a6e2a
--- /dev/null
+++ b/inc/Property.h
@@ -0,0 +1,78 @@
+/**
+ * @file Property.h
+ * Processor Property class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __PROPERTY_H__
+#define __PROPERTY_H__
+
+#include <string>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+#include <set>
+
+//! Property Class
+class Property {
+
+public:
+       //! Constructor
+       /*!
+        * Create a new property
+        */
+       Property(const std::string name, const std::string description, const 
std::string value)
+               : _name(name), _description(description), _value(value) {
+       }
+       Property();
+       //! Destructor
+       virtual ~Property();
+       //! Get Name for the property
+       std::string getName() {
+               return _name;
+       }
+       //! Get Description for the property
+       std::string getDescription() {
+               return _description;
+       }
+       //! Get value for the property
+       std::string getValue() {
+               return _value;
+       }
+       //! Set value for the property
+       void setValue(std::string value) {
+               _value = value;
+       }
+       //! Compare
+       bool operator < (const Property & right) const {
+               return _name < right._name;
+       }
+
+protected:
+       //! Name
+       std::string _name;
+       //! Description
+       std::string _description;
+       //! Value
+       std::string _value;
+
+private:
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/Relationship.h
----------------------------------------------------------------------
diff --git a/inc/Relationship.h b/inc/Relationship.h
new file mode 100644
index 0000000..bd36531
--- /dev/null
+++ b/inc/Relationship.h
@@ -0,0 +1,86 @@
+/**
+ * @file Relationship.h
+ * Relationship class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RELATIONSHIP_H__
+#define __RELATIONSHIP_H__
+
+#include <string>
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+
+//! undefined relationship for remote process group outgoing port and root 
process group incoming port
+#define UNDEFINED_RELATIONSHIP "undefined"
+
+inline bool isRelationshipNameUndefined(std::string name)
+{
+       if (name == UNDEFINED_RELATIONSHIP)
+               return true;
+       else
+               return false;
+}
+
+//! Relationship Class
+class Relationship {
+
+public:
+       //! Constructor
+       /*!
+        * Create a new relationship 
+        */
+       Relationship(const std::string name, const std::string description)
+               : _name(name), _description(description) {
+       }
+       Relationship()
+               : _name(UNDEFINED_RELATIONSHIP) {
+       }
+       //! Destructor
+       virtual ~Relationship();
+       //! Get Name for the relationship
+       std::string getName() {
+               return _name;
+       }
+       //! Get Description for the relationship
+       std::string getDescription() {
+               return _description;
+       }
+       //! Compare
+       bool operator < (const Relationship & right) const {
+               return _name < right._name;
+       }
+       //! Whether it is a undefined relationship
+       bool isRelationshipUndefined()
+       {
+               return isRelationshipNameUndefined(_name);
+       }
+
+protected:
+
+       //! Name
+       std::string _name;
+       //! Description
+       std::string _description;
+
+private:
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/ResourceClaim.h
----------------------------------------------------------------------
diff --git a/inc/ResourceClaim.h b/inc/ResourceClaim.h
new file mode 100644
index 0000000..ef94157
--- /dev/null
+++ b/inc/ResourceClaim.h
@@ -0,0 +1,82 @@
+/**
+ * @file ResourceClaim.h
+ * Resource Claim class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RESOURCE_CLAIM_H__
+#define __RESOURCE_CLAIM_H__
+
+#include <string>
+#include <uuid/uuid.h>
+#include <vector>
+#include <queue>
+#include <map>
+#include <mutex>
+#include <atomic>
+
+//! Default content directory
+#define DEFAULT_CONTENT_DIRECTORY "."
+
+//! ResourceClaim Class
+class ResourceClaim {
+
+public:
+       //! Constructor
+       /*!
+        * Create a new resource claim
+        */
+       ResourceClaim(const std::string contentDirectory);
+       //! Destructor
+       virtual ~ResourceClaim();
+       //! increaseFlowFileRecordOwnedCount
+       void increaseFlowFileRecordOwnedCount()
+       {
+               ++_flowFileRecordOwnedCount;
+       }
+       //! decreaseFlowFileRecordOwenedCount
+       void decreaseFlowFileRecordOwnedCount()
+       {
+               --_flowFileRecordOwnedCount;
+       }
+       //! Get the content full path
+       std::string getContentFullPath()
+       {
+               return _contentFullPath;
+       }
+
+protected:
+       //! A global unique identifier
+       uuid_t _uuid;
+       //! A local unique identifier
+       uint64_t _id;
+       //! Full path to the content
+       std::string _contentFullPath;
+
+       //! How many FlowFileRecord Own this cliam
+       std::atomic<uint64_t> _flowFileRecordOwnedCount;
+
+private:
+       // Prevent default copy constructor and assignment operation
+       // Only support pass by reference or pointer
+       ResourceClaim(const ResourceClaim &parent);
+       ResourceClaim &operator=(const ResourceClaim &parent);
+
+       //! Local resource claim number
+       static std::atomic<uint64_t> _localResourceClaimNumber;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/TimeUtil.h
----------------------------------------------------------------------
diff --git a/inc/TimeUtil.h b/inc/TimeUtil.h
new file mode 100644
index 0000000..996498d
--- /dev/null
+++ b/inc/TimeUtil.h
@@ -0,0 +1,62 @@
+/**
+ * @file TimeUtil.h
+ * Basic Time Utility 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TIME_UTIL_H__
+#define __TIME_UTIL_H__
+
+#include <time.h>
+#include <sys/time.h>
+#include <string.h>
+
+#ifdef __MACH__
+#include <mach/clock.h>
+#include <mach/mach.h>
+#endif
+
+uint64_t getTimeMillis()
+{
+       uint64_t value;
+
+       timeval time;
+       gettimeofday(&time, NULL);
+       value = (time.tv_sec * 1000) + (time.tv_usec / 1000);
+
+       return value;
+}
+
+uint64_t getTimeNano()
+{
+       struct timespec ts;
+       
+#ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time
+       clock_serv_t cclock;
+       mach_timespec_t mts;
+       host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
+       clock_get_time(cclock, &mts);
+       mach_port_deallocate(mach_task_self(), cclock);
+       ts.tv_sec = mts.tv_sec;
+       ts.tv_nsec = mts.tv_nsec;
+#else
+       clock_gettime(CLOCK_REALTIME, &ts);
+#endif
+
+       return (ts.tv_sec * 1000000000 + ts.tv_nsec);
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/async_logger.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/async_logger.h b/inc/spdlog/async_logger.h
new file mode 100644
index 0000000..517ce92
--- /dev/null
+++ b/inc/spdlog/async_logger.h
@@ -0,0 +1,90 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+// Very fast asynchronous logger (millions of logs per second on an average 
desktop)
+// Uses pre allocated lockfree queue for maximum throughput even under large 
number of threads.
+// Creates a single back thread to pop messages from the queue and log them.
+//
+// Upon each log write the logger:
+//    1. Checks if its log level is enough to log the message
+//    2. Push a new copy of the message to a queue (or block the caller until 
space is available in the queue)
+//    3. will throw spdlog_ex upon log exceptions
+// Upong destruction, logs all remaining messages in the queue before 
destructing..
+
+#include <chrono>
+#include <functional>
+#include "common.h"
+#include "logger.h"
+#include "spdlog.h"
+
+
+namespace spdlog
+{
+
+namespace details
+{
+class async_log_helper;
+}
+
+class async_logger :public logger
+{
+public:
+    template<class It>
+    async_logger(const std::string& name,
+                 const It& begin,
+                 const It& end,
+                 size_t queue_size,
+                 const async_overflow_policy overflow_policy =  
async_overflow_policy::block_retry,
+                 const std::function<void()>& worker_warmup_cb = nullptr,
+                 const std::chrono::milliseconds& flush_interval_ms = 
std::chrono::milliseconds::zero());
+
+    async_logger(const std::string& logger_name,
+                 sinks_init_list sinks,
+                 size_t queue_size,
+                 const async_overflow_policy overflow_policy = 
async_overflow_policy::block_retry,
+                 const std::function<void()>& worker_warmup_cb = nullptr,
+                 const std::chrono::milliseconds& flush_interval_ms = 
std::chrono::milliseconds::zero());
+
+    async_logger(const std::string& logger_name,
+                 sink_ptr single_sink,
+                 size_t queue_size,
+                 const async_overflow_policy overflow_policy =  
async_overflow_policy::block_retry,
+                 const std::function<void()>& worker_warmup_cb = nullptr,
+                 const std::chrono::milliseconds& flush_interval_ms = 
std::chrono::milliseconds::zero());
+
+
+protected:
+    void _log_msg(details::log_msg& msg) override;
+    void _set_formatter(spdlog::formatter_ptr msg_formatter) override;
+    void _set_pattern(const std::string& pattern) override;
+
+private:
+    std::unique_ptr<details::async_log_helper> _async_log_helper;
+};
+}
+
+
+#include "./details/async_logger_impl.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/common.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/common.h b/inc/spdlog/common.h
new file mode 100644
index 0000000..cde5a9e
--- /dev/null
+++ b/inc/spdlog/common.h
@@ -0,0 +1,116 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+#include <string>
+#include <initializer_list>
+#include <chrono>
+#include <memory>
+
+//visual studio does not support noexcept yet
+#ifndef _MSC_VER
+#define SPDLOG_NOEXCEPT noexcept
+#else
+#define SPDLOG_NOEXCEPT throw()
+#endif
+
+
+namespace spdlog
+{
+
+class formatter;
+
+namespace sinks
+{
+class sink;
+}
+
+// Common types across the lib
+using log_clock = std::chrono::system_clock;
+using sink_ptr = std::shared_ptr < sinks::sink > ;
+using sinks_init_list = std::initializer_list < sink_ptr > ;
+using formatter_ptr = std::shared_ptr<spdlog::formatter>;
+
+
+//Log level enum
+namespace level
+{
+typedef enum
+{
+    trace    = 0,
+    debug    = 1,
+    info     = 2,
+    notice   = 3,
+    warn     = 4,
+    err      = 5,
+    critical = 6,
+    alert    = 7,
+    emerg    = 8,
+    off      = 9
+} level_enum;
+
+static const char* level_names[] { "trace", "debug", "info", "notice", 
"warning", "error", "critical", "alert", "emerg", "off"};
+
+static const char* short_level_names[] { "T", "D", "I", "N", "W", "E", "C", 
"A", "M", "O"};
+
+inline const char* to_str(spdlog::level::level_enum l)
+{
+    return level_names[l];
+}
+
+inline const char* to_short_str(spdlog::level::level_enum l)
+{
+    return short_level_names[l];
+}
+} //level
+
+
+//
+// Async overflow policy - block by default.
+//
+enum class async_overflow_policy
+{
+    block_retry, // Block / yield / sleep until message can be enqueued
+    discard_log_msg // Discard the message it enqueue fails
+};
+
+
+//
+// Log exception
+//
+class spdlog_ex : public std::exception
+{
+public:
+    spdlog_ex(const std::string& msg) :_msg(msg) {}
+    const char* what() const SPDLOG_NOEXCEPT override
+    {
+        return _msg.c_str();
+    }
+private:
+    std::string _msg;
+
+};
+
+} //spdlog

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/details/async_log_helper.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/details/async_log_helper.h 
b/inc/spdlog/details/async_log_helper.h
new file mode 100644
index 0000000..59c1b2d
--- /dev/null
+++ b/inc/spdlog/details/async_log_helper.h
@@ -0,0 +1,326 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+// async log helper :
+// Process logs asynchronously using a back thread.
+//
+// If the internal queue of log messages reaches its max size,
+// then the client call will block until there is more room.
+//
+// If the back thread throws during logging, a spdlog::spdlog_ex exception
+// will be thrown in client's thread when tries to log the next message
+
+#pragma once
+
+#include <chrono>
+#include <thread>
+#include <atomic>
+#include <functional>
+
+#include "../common.h"
+#include "../sinks/sink.h"
+#include "./mpmc_bounded_q.h"
+#include "./log_msg.h"
+#include "./format.h"
+#include "os.h"
+
+
+namespace spdlog
+{
+namespace details
+{
+
+class async_log_helper
+{
+    // Async msg to move to/from the queue
+    // Movable only. should never be copied
+    struct async_msg
+    {
+        std::string logger_name;
+        level::level_enum level;
+        log_clock::time_point time;
+        size_t thread_id;
+        std::string txt;
+
+        async_msg() = default;
+        ~async_msg() = default;
+
+async_msg(async_msg&& other) SPDLOG_NOEXCEPT:
+        logger_name(std::move(other.logger_name)),
+                    level(std::move(other.level)),
+                    time(std::move(other.time)),
+                    txt(std::move(other.txt))
+        {}
+
+        async_msg& operator=(async_msg&& other) SPDLOG_NOEXCEPT
+        {
+            logger_name = std::move(other.logger_name);
+            level = other.level;
+            time = std::move(other.time);
+            thread_id = other.thread_id;
+            txt = std::move(other.txt);
+            return *this;
+        }
+        // never copy or assign. should only be moved..
+        async_msg(const async_msg&) = delete;
+        async_msg& operator=(async_msg& other) = delete;
+
+        // construct from log_msg
+        async_msg(const details::log_msg& m) :
+            logger_name(m.logger_name),
+            level(m.level),
+            time(m.time),
+            thread_id(m.thread_id),
+            txt(m.raw.data(), m.raw.size())
+        {}
+
+
+        // copy into log_msg
+        void fill_log_msg(log_msg &msg)
+        {
+            msg.clear();
+            msg.logger_name = logger_name;
+            msg.level = level;
+            msg.time = time;
+            msg.thread_id = thread_id;
+            msg.raw << txt;
+        }
+    };
+
+public:
+
+    using item_type = async_msg;
+    using q_type = details::mpmc_bounded_queue<item_type>;
+
+    using clock = std::chrono::steady_clock;
+
+
+    async_log_helper(formatter_ptr formatter,
+                     const std::vector<sink_ptr>& sinks,
+                     size_t queue_size,
+                     const async_overflow_policy overflow_policy = 
async_overflow_policy::block_retry,
+                     const std::function<void()>& worker_warmup_cb = nullptr,
+                     const std::chrono::milliseconds& flush_interval_ms = 
std::chrono::milliseconds::zero());
+
+    void log(const details::log_msg& msg);
+
+    // stop logging and join the back thread
+    ~async_log_helper();
+
+    void set_formatter(formatter_ptr);
+
+
+private:
+    formatter_ptr _formatter;
+    std::vector<std::shared_ptr<sinks::sink>> _sinks;
+
+    // queue of messages to log
+    q_type _q;
+
+    // last exception thrown from the worker thread
+    std::shared_ptr<spdlog_ex> _last_workerthread_ex;
+
+    // overflow policy
+    const async_overflow_policy _overflow_policy;
+
+    // worker thread warmup callback - one can set thread priority, affinity, 
etc
+    const std::function<void()> _worker_warmup_cb;
+
+    // auto periodic sink flush parameter
+    const std::chrono::milliseconds _flush_interval_ms;
+
+    // worker thread
+    std::thread _worker_thread;
+
+    // throw last worker thread exception or if worker thread is not active
+    void throw_if_bad_worker();
+
+    // worker thread main loop
+    void worker_loop();
+
+    // pop next message from the queue and process it
+    // return true if a message was available (queue was not empty), will set 
the last_pop to the pop time
+    bool process_next_msg(log_clock::time_point& last_pop, 
log_clock::time_point& last_flush);
+
+    void handle_flush_interval(log_clock::time_point& now, 
log_clock::time_point& last_flush);
+
+    // sleep,yield or return immediatly using the time passed since last 
message as a hint
+    static void sleep_or_yield(const spdlog::log_clock::time_point& now, const 
log_clock::time_point& last_op_time);
+
+
+
+};
+}
+}
+
+///////////////////////////////////////////////////////////////////////////////
+// async_sink class implementation
+///////////////////////////////////////////////////////////////////////////////
+inline spdlog::details::async_log_helper::async_log_helper(formatter_ptr 
formatter, const std::vector<sink_ptr>& sinks, size_t queue_size, const 
async_overflow_policy overflow_policy, const std::function<void()>& 
worker_warmup_cb, const std::chrono::milliseconds& flush_interval_ms):
+    _formatter(formatter),
+    _sinks(sinks),
+    _q(queue_size),
+    _overflow_policy(overflow_policy),
+    _worker_warmup_cb(worker_warmup_cb),
+    _flush_interval_ms(flush_interval_ms),
+    _worker_thread(&async_log_helper::worker_loop, this)
+{}
+
+// Send to the worker thread termination message(level=off)
+// and wait for it to finish gracefully
+inline spdlog::details::async_log_helper::~async_log_helper()
+{
+
+    try
+    {
+        log(log_msg(level::off));
+        _worker_thread.join();
+    }
+    catch (...) //Dont crash if thread not joinable
+    {}
+}
+
+
+//Try to push and block until succeeded
+inline void spdlog::details::async_log_helper::log(const details::log_msg& msg)
+{
+    throw_if_bad_worker();
+    async_msg new_msg(msg);
+    if (!_q.enqueue(std::move(new_msg)) && _overflow_policy != 
async_overflow_policy::discard_log_msg)
+    {
+        auto last_op_time = details::os::now();
+        auto now = last_op_time;
+        do
+        {
+            now = details::os::now();
+            sleep_or_yield(now, last_op_time);
+        }
+        while (!_q.enqueue(std::move(new_msg)));
+    }
+
+}
+
+inline void spdlog::details::async_log_helper::worker_loop()
+{
+    try
+    {
+        if (_worker_warmup_cb) _worker_warmup_cb();
+        auto last_pop = details::os::now();
+        auto last_flush = last_pop;
+        while(process_next_msg(last_pop, last_flush));
+    }
+    catch (const std::exception& ex)
+    {
+        _last_workerthread_ex = 
std::make_shared<spdlog_ex>(std::string("async_logger worker thread exception: 
") + ex.what());
+    }
+    catch (...)
+    {
+        _last_workerthread_ex = std::make_shared<spdlog_ex>("async_logger 
worker thread exception");
+    }
+}
+
+// process next message in the queue
+// return true if this thread should still be active (no msg with level::off 
was received)
+inline bool 
spdlog::details::async_log_helper::process_next_msg(log_clock::time_point& 
last_pop, log_clock::time_point& last_flush)
+{
+
+    async_msg incoming_async_msg;
+    log_msg incoming_log_msg;
+
+    if (_q.dequeue(incoming_async_msg))
+    {
+        last_pop = details::os::now();
+
+        if(incoming_async_msg.level == level::off)
+            return false;
+
+        incoming_async_msg.fill_log_msg(incoming_log_msg);
+        _formatter->format(incoming_log_msg);
+        for (auto &s : _sinks)
+            s->log(incoming_log_msg);
+    }
+    else //empty queue
+    {
+        auto now = details::os::now();
+        handle_flush_interval(now, last_flush);
+        sleep_or_yield(now, last_pop);
+    }
+    return true;
+}
+
+inline void 
spdlog::details::async_log_helper::handle_flush_interval(log_clock::time_point& 
now, log_clock::time_point& last_flush)
+{
+    if (_flush_interval_ms != std::chrono::milliseconds::zero() && now - 
last_flush >= _flush_interval_ms)
+    {
+        for (auto &s : _sinks)
+            s->flush();
+        now = last_flush = details::os::now();
+    }
+}
+inline void spdlog::details::async_log_helper::set_formatter(formatter_ptr 
msg_formatter)
+{
+    _formatter = msg_formatter;
+}
+
+
+// sleep,yield or return immediatly using the time passed since last message 
as a hint
+inline void spdlog::details::async_log_helper::sleep_or_yield(const 
spdlog::log_clock::time_point& now, const spdlog::log_clock::time_point& 
last_op_time)
+{
+    using std::chrono::milliseconds;
+    using namespace std::this_thread;
+
+    auto time_since_op = now - last_op_time;
+
+    // spin upto 1 ms
+    if (time_since_op <= milliseconds(1))
+        return;
+
+    // yield upto 10ms
+    if (time_since_op <= milliseconds(10))
+        return yield();
+
+
+    // sleep for half of duration since last op
+    if (time_since_op <= milliseconds(100))
+        return sleep_for(time_since_op / 2);
+
+    return sleep_for(milliseconds(100));
+}
+
+// throw if the worker thread threw an exception or not active
+inline void spdlog::details::async_log_helper::throw_if_bad_worker()
+{
+    if (_last_workerthread_ex)
+    {
+        auto ex = std::move(_last_workerthread_ex);
+        throw *ex;
+    }
+}
+
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/details/async_logger_impl.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/details/async_logger_impl.h 
b/inc/spdlog/details/async_logger_impl.h
new file mode 100644
index 0000000..f60407e
--- /dev/null
+++ b/inc/spdlog/details/async_logger_impl.h
@@ -0,0 +1,82 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+
+#include "./async_log_helper.h"
+
+//
+// Async Logger implementation
+// Use single async_sink (queue) to perform the logging in a worker thread
+//
+
+
+template<class It>
+inline spdlog::async_logger::async_logger(const std::string& logger_name,
+        const It& begin,
+        const It& end,
+        size_t queue_size,
+        const  async_overflow_policy overflow_policy,
+        const std::function<void()>& worker_warmup_cb,
+        const std::chrono::milliseconds& flush_interval_ms) :
+    logger(logger_name, begin, end),
+    _async_log_helper(new details::async_log_helper(_formatter, _sinks, 
queue_size, overflow_policy, worker_warmup_cb, flush_interval_ms))
+{
+}
+
+inline spdlog::async_logger::async_logger(const std::string& logger_name,
+        sinks_init_list sinks,
+        size_t queue_size,
+        const  async_overflow_policy overflow_policy,
+        const std::function<void()>& worker_warmup_cb,
+        const std::chrono::milliseconds& flush_interval_ms) :
+    async_logger(logger_name, sinks.begin(), sinks.end(), queue_size, 
overflow_policy, worker_warmup_cb, flush_interval_ms) {}
+
+inline spdlog::async_logger::async_logger(const std::string& logger_name,
+        sink_ptr single_sink,
+        size_t queue_size,
+        const  async_overflow_policy overflow_policy,
+        const std::function<void()>& worker_warmup_cb,
+        const std::chrono::milliseconds& flush_interval_ms) :
+    async_logger(logger_name, { single_sink }, queue_size, overflow_policy, 
worker_warmup_cb, flush_interval_ms) {}
+
+
+inline void spdlog::async_logger::_set_formatter(spdlog::formatter_ptr 
msg_formatter)
+{
+    _formatter = msg_formatter;
+    _async_log_helper->set_formatter(_formatter);
+}
+
+inline void spdlog::async_logger::_set_pattern(const std::string& pattern)
+{
+    _formatter = std::make_shared<pattern_formatter>(pattern);
+    _async_log_helper->set_formatter(_formatter);
+}
+
+
+inline void spdlog::async_logger::_log_msg(details::log_msg& msg)
+{
+    _async_log_helper->log(msg);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/details/file_helper.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/details/file_helper.h b/inc/spdlog/details/file_helper.h
new file mode 100644
index 0000000..8e1f600
--- /dev/null
+++ b/inc/spdlog/details/file_helper.h
@@ -0,0 +1,144 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+// Helper class for file sink
+// When failing to open a file, retry several times(5) with small delay 
between the tries(10 ms)
+// Can be set to auto flush on every line
+// Throw spdlog_ex exception on errors
+
+#include <string>
+#include <thread>
+#include <chrono>
+#include "os.h"
+
+
+
+
+namespace spdlog
+{
+namespace details
+{
+
+class file_helper
+{
+public:
+    const int open_tries = 5;
+    const int open_interval = 10;
+
+    explicit file_helper(bool force_flush):
+        _fd(nullptr),
+        _force_flush(force_flush)
+    {}
+
+    file_helper(const file_helper&) = delete;
+    file_helper& operator=(const file_helper&) = delete;
+
+    ~file_helper()
+    {
+        close();
+    }
+
+
+    void open(const std::string& fname, bool truncate=false)
+    {
+
+        close();
+        const char* mode = truncate ? "wb" : "ab";
+        _filename = fname;
+        for (int tries = 0; tries < open_tries; ++tries)
+        {
+            if(!os::fopen_s(&_fd, fname, mode))
+                return;
+
+            
std::this_thread::sleep_for(std::chrono::milliseconds(open_interval));
+        }
+
+        throw spdlog_ex("Failed opening file " + fname + " for writing");
+    }
+
+    void reopen(bool truncate)
+    {
+        if(_filename.empty())
+            throw spdlog_ex("Failed re opening file - was not opened before");
+        open(_filename, truncate);
+
+    }
+
+    void flush() {
+        std::fflush(_fd);
+    }
+
+    void close()
+    {
+        if (_fd)
+        {
+            std::fclose(_fd);
+            _fd = nullptr;
+        }
+    }
+
+    void write(const log_msg& msg)
+    {
+
+        size_t size = msg.formatted.size();
+        auto data = msg.formatted.data();
+        if(std::fwrite(data, 1, size, _fd) != size)
+            throw spdlog_ex("Failed writing to file " + _filename);
+
+        if(_force_flush)
+            std::fflush(_fd);
+
+    }
+
+    const std::string& filename() const
+    {
+        return _filename;
+    }
+
+    static bool file_exists(const std::string& name)
+    {
+        FILE* file;
+        if (!os::fopen_s(&file, name.c_str(), "r"))
+        {
+            fclose(file);
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+private:
+    FILE* _fd;
+    std::string _filename;
+    bool _force_flush;
+
+
+};
+}
+}
+

Reply via email to