Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/MINIFI-6 c3444a6b7 -> 31c9a1b61 (forced update)


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/null_sink.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/sinks/null_sink.h b/inc/spdlog/sinks/null_sink.h
new file mode 100644
index 0000000..992b3b7
--- /dev/null
+++ b/inc/spdlog/sinks/null_sink.h
@@ -0,0 +1,52 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+#include <mutex>
+#include "./base_sink.h"
+#include "../details/null_mutex.h"
+
+
+namespace spdlog
+{
+namespace sinks
+{
+
+template <class Mutex>
+class null_sink : public base_sink < Mutex >
+{
+protected:
+    void _sink_it(const details::log_msg&) override
+    {}
+
+    void flush() override
+    {}
+
+};
+typedef null_sink<details::null_mutex> null_sink_st;
+typedef null_sink<std::mutex> null_sink_mt;
+
+}
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/ostream_sink.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/sinks/ostream_sink.h b/inc/spdlog/sinks/ostream_sink.h
new file mode 100644
index 0000000..f2fe3b2
--- /dev/null
+++ b/inc/spdlog/sinks/ostream_sink.h
@@ -0,0 +1,67 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+#include <ostream>
+#include <mutex>
+#include <memory>
+
+#include "../details/null_mutex.h"
+#include "./base_sink.h"
+
+namespace spdlog
+{
+namespace sinks
+{
+template<class Mutex>
+class ostream_sink: public base_sink<Mutex>
+{
+public:
+    explicit ostream_sink(std::ostream& os, bool force_flush=false) 
:_ostream(os), _force_flush(force_flush) {}
+    ostream_sink(const ostream_sink&) = delete;
+    ostream_sink& operator=(const ostream_sink&) = delete;
+    virtual ~ostream_sink() = default;
+
+protected:
+    void _sink_it(const details::log_msg& msg) override
+    {
+        _ostream.write(msg.formatted.data(), msg.formatted.size());
+        if (_force_flush)
+            _ostream.flush();
+    }
+
+    void flush() override
+    {
+        _ostream.flush();
+    }
+
+    std::ostream& _ostream;
+    bool _force_flush;
+};
+
+typedef ostream_sink<std::mutex> ostream_sink_mt;
+typedef ostream_sink<details::null_mutex> ostream_sink_st;
+}
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/sink.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/sinks/sink.h b/inc/spdlog/sinks/sink.h
new file mode 100644
index 0000000..88c423a
--- /dev/null
+++ b/inc/spdlog/sinks/sink.h
@@ -0,0 +1,42 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+#include "../details/log_msg.h"
+
+namespace spdlog
+{
+namespace sinks
+{
+class sink
+{
+public:
+    virtual ~sink() {}
+    virtual void log(const details::log_msg& msg) = 0;
+    virtual void flush() = 0;
+};
+}
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/stdout_sinks.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/sinks/stdout_sinks.h b/inc/spdlog/sinks/stdout_sinks.h
new file mode 100644
index 0000000..5ad06c2
--- /dev/null
+++ b/inc/spdlog/sinks/stdout_sinks.h
@@ -0,0 +1,71 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+#include <iostream>
+#include <mutex>
+#include "./ostream_sink.h"
+#include "../details/null_mutex.h"
+
+namespace spdlog
+{
+namespace sinks
+{
+
+template <class Mutex>
+class stdout_sink : public ostream_sink<Mutex>
+{
+    using MyType = stdout_sink<Mutex>;
+public:
+    stdout_sink() : ostream_sink<Mutex>(std::cout, true) {}
+    static std::shared_ptr<MyType> instance()
+    {
+        static std::shared_ptr<MyType> instance = std::make_shared<MyType>();
+        return instance;
+    }
+};
+
+typedef stdout_sink<details::null_mutex> stdout_sink_st;
+typedef stdout_sink<std::mutex> stdout_sink_mt;
+
+
+template <class Mutex>
+class stderr_sink : public ostream_sink<Mutex>
+{
+    using MyType = stderr_sink<Mutex>;
+public:
+    stderr_sink() : ostream_sink<Mutex>(std::cerr, true) {}
+    static std::shared_ptr<MyType> instance()
+    {
+        static std::shared_ptr<MyType> instance = std::make_shared<MyType>();
+        return instance;
+    }
+
+};
+
+typedef stderr_sink<std::mutex> stderr_sink_mt;
+typedef stderr_sink<details::null_mutex> stderr_sink_st;
+}
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/syslog_sink.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/sinks/syslog_sink.h b/inc/spdlog/sinks/syslog_sink.h
new file mode 100644
index 0000000..37b6513
--- /dev/null
+++ b/inc/spdlog/sinks/syslog_sink.h
@@ -0,0 +1,102 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+#pragma once
+
+#ifdef __linux__
+
+#include <array>
+#include <string>
+#include <syslog.h>
+
+#include "./sink.h"
+#include "../common.h"
+#include "../details/log_msg.h"
+
+
+namespace spdlog
+{
+namespace sinks
+{
+/**
+ * Sink that write to syslog using the `syscall()` library call.
+ *
+ * Locking is not needed, as `syslog()` itself is thread-safe.
+ */
+class syslog_sink : public sink
+{
+public:
+    //
+    syslog_sink(const std::string& ident = "", int syslog_option=0, int 
syslog_facility=LOG_USER):
+        _ident(ident)
+    {
+        _priorities[static_cast<int>(level::trace)] = LOG_DEBUG;
+        _priorities[static_cast<int>(level::debug)] = LOG_DEBUG;
+        _priorities[static_cast<int>(level::info)] = LOG_INFO;
+        _priorities[static_cast<int>(level::notice)] = LOG_NOTICE;
+        _priorities[static_cast<int>(level::warn)] = LOG_WARNING;
+        _priorities[static_cast<int>(level::err)] = LOG_ERR;
+        _priorities[static_cast<int>(level::critical)] = LOG_CRIT;
+        _priorities[static_cast<int>(level::alert)] = LOG_ALERT;
+        _priorities[static_cast<int>(level::emerg)] = LOG_EMERG;
+        _priorities[static_cast<int>(level::off)] = LOG_INFO;
+
+        //set ident to be program name if empty
+        ::openlog(_ident.empty()? nullptr:_ident.c_str(), syslog_option, 
syslog_facility);
+    }
+    ~syslog_sink()
+    {
+        ::closelog();
+    }
+
+    syslog_sink(const syslog_sink&) = delete;
+    syslog_sink& operator=(const syslog_sink&) = delete;
+
+    void log(const details::log_msg &msg) override
+    {
+        ::syslog(syslog_prio_from_level(msg), "%s", 
msg.formatted.str().c_str());
+    }
+
+    void flush() override
+    {
+    }
+
+
+private:
+    std::array<int, 10> _priorities;
+    //must store the ident because the man says openlog might use the pointer 
as is and not a string copy
+    const std::string _ident;
+
+    //
+    // Simply maps spdlog's log level to syslog priority level.
+    //
+    int syslog_prio_from_level(const details::log_msg &msg) const
+    {
+        return _priorities[static_cast<int>(msg.level)];
+    }
+};
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/spdlog.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/spdlog.h b/inc/spdlog/spdlog.h
new file mode 100644
index 0000000..5cec562
--- /dev/null
+++ b/inc/spdlog/spdlog.h
@@ -0,0 +1,155 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+
+// spdlog main header file.
+//see example.cpp for usage example
+
+#pragma once
+
+#include "tweakme.h"
+#include "common.h"
+#include "logger.h"
+
+namespace spdlog
+{
+// Return an existing logger or nullptr if a logger with such name doesn't 
exist.
+// Examples:
+//
+// spdlog::get("mylog")->info("Hello");
+// auto logger = spdlog::get("mylog");
+// logger.info("This is another message" , x, y, z);
+// logger.info() << "This is another message" << x << y << z;
+std::shared_ptr<logger> get(const std::string& name);
+
+//
+// Set global formatting
+// example: spdlog::set_pattern("%Y-%m-%d %H:%M:%S.%e %l : %v");
+//
+void set_pattern(const std::string& format_string);
+void set_formatter(formatter_ptr f);
+
+//
+// Set global logging level for
+//
+void set_level(level::level_enum log_level);
+
+//
+// Turn on async mode (off by default) and set the queue size for each 
async_logger.
+// effective only for loggers created after this call.
+// queue_size: size of queue (must be power of 2):
+//    Each logger will pre-allocate a dedicated queue with queue_size entries 
upon construction.
+//
+// async_overflow_policy (optional, block_retry by default):
+//    async_overflow_policy::block_retry - if queue is full, block until queue 
has room for the new log entry.
+//    async_overflow_policy::discard_log_msg - never block and discard any new 
messages when queue  overflows.
+//
+// worker_warmup_cb (optional):
+//     callback function that will be called in worker thread upon start (can 
be used to init stuff like thread affinity)
+//
+void set_async_mode(size_t queue_size, const async_overflow_policy 
overflow_policy = async_overflow_policy::block_retry, const 
std::function<void()>& worker_warmup_cb = nullptr, const 
std::chrono::milliseconds& flush_interval_ms = 
std::chrono::milliseconds::zero());
+
+// Turn off async mode
+void set_sync_mode();
+
+//
+// Create and register multi/single threaded rotating file logger
+//
+std::shared_ptr<logger> rotating_logger_mt(const std::string& logger_name, 
const std::string& filename, size_t max_file_size, size_t max_files, bool 
force_flush = false);
+std::shared_ptr<logger> rotating_logger_st(const std::string& logger_name, 
const std::string& filename, size_t max_file_size, size_t max_files, bool 
force_flush = false);
+
+//
+// Create file logger which creates new file on the given time (default in  
midnight):
+//
+std::shared_ptr<logger> daily_logger_mt(const std::string& logger_name, const 
std::string& filename, int hour=0, int minute=0, bool force_flush = false);
+std::shared_ptr<logger> daily_logger_st(const std::string& logger_name, const 
std::string& filename, int hour=0, int minute=0, bool force_flush = false);
+
+
+//
+// Create and register stdout/stderr loggers
+//
+std::shared_ptr<logger> stdout_logger_mt(const std::string& logger_name);
+std::shared_ptr<logger> stdout_logger_st(const std::string& logger_name);
+std::shared_ptr<logger> stderr_logger_mt(const std::string& logger_name);
+std::shared_ptr<logger> stderr_logger_st(const std::string& logger_name);
+
+
+//
+// Create and register a syslog logger
+//
+#ifdef __linux__
+std::shared_ptr<logger> syslog_logger(const std::string& logger_name, const 
std::string& ident = "", int syslog_option = 0);
+#endif
+
+
+// Create and register a logger with multiple sinks
+std::shared_ptr<logger> create(const std::string& logger_name, sinks_init_list 
sinks);
+template<class It>
+std::shared_ptr<logger> create(const std::string& logger_name, const It& 
sinks_begin, const It& sinks_end);
+
+
+// Create and register a logger with templated sink type
+// Example: spdlog::create<daily_file_sink_st>("mylog", "dailylog_filename", 
"txt");
+template <typename Sink, typename... Args>
+std::shared_ptr<spdlog::logger> create(const std::string& logger_name, const 
Args&...);
+
+
+// Register the given logger with the given name
+void register_logger(std::shared_ptr<logger> logger);
+
+// Drop the reference to the given logger
+void drop(const std::string &name);
+
+// Drop all references
+void drop_all();
+
+
+///////////////////////////////////////////////////////////////////////////////
+//
+// Macros to be display source file & line
+// Trace & Debug can be switched on/off at compile time for zero cost debug 
statements.
+// Uncomment SPDLOG_DEBUG_ON/SPDLOG_TRACE_ON in teakme.h to enable.
+//
+// Example:
+// spdlog::set_level(spdlog::level::debug);
+// SPDLOG_DEBUG(my_logger, "Some debug message {} {}", 1, 3.2);
+///////////////////////////////////////////////////////////////////////////////
+
+#ifdef SPDLOG_TRACE_ON
+#define SPDLOG_TRACE(logger, ...) logger->trace(__VA_ARGS__) << " (" << 
__FILE__ << " #" << __LINE__ <<")";
+#else
+#define SPDLOG_TRACE(logger, ...)
+#endif
+
+#ifdef SPDLOG_DEBUG_ON
+#define SPDLOG_DEBUG(logger, ...) logger->debug(__VA_ARGS__)  << " (" << 
__FILE__ << " #" << __LINE__ <<")";
+#else
+#define SPDLOG_DEBUG(logger, ...)
+#endif
+
+
+}
+
+
+#include "details/spdlog_impl.h"

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/tweakme.h
----------------------------------------------------------------------
diff --git a/inc/spdlog/tweakme.h b/inc/spdlog/tweakme.h
new file mode 100644
index 0000000..b651658
--- /dev/null
+++ b/inc/spdlog/tweakme.h
@@ -0,0 +1,74 @@
+/*************************************************************************/
+/* spdlog - an extremely fast and easy to use c++11 logging library.     */
+/* Copyright (c) 2014 Gabi Melman.                                       */
+/*                                                                       */
+/* Permission is hereby granted, free of charge, to any person obtaining */
+/* a copy of this software and associated documentation files (the       */
+/* "Software"), to deal in the Software without restriction, including   */
+/* without limitation the rights to use, copy, modify, merge, publish,   */
+/* distribute, sublicense, and/or sell copies of the Software, and to    */
+/* permit persons to whom the Software is furnished to do so, subject to */
+/* the following conditions:                                             */
+/*                                                                       */
+/* The above copyright notice and this permission notice shall be        */
+/* included in all copies or substantial portions of the Software.       */
+/*                                                                       */
+/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,       */
+/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF    */
+/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/
+/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY  */
+/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,  */
+/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE     */
+/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.                */
+/*************************************************************************/
+
+
+#pragma once
+
+///////////////////////////////////////////////////////////////////////////////
+// Edit this file to squeeze every last drop of performance out of spdlog.
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Under Linux, the much faster CLOCK_REALTIME_COARSE clock can be used.
+// This clock is less accurate - can be off by dozens of millis - depending on 
the kernel HZ.
+// Uncomment to use it instead of the regular (but slower) clock.
+// #define SPDLOG_CLOCK_COARSE
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment if date/time logging is not needed.
+// This will prevent spdlog from quering the clock on each log call.
+// #define SPDLOG_NO_DATETIME
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment if thread id logging is not needed (i.e. no %t in the log 
pattern).
+// This will prevent spdlog from quering the thread id on each log call.
+// #define SPDLOG_NO_THREAD_ID
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment if logger name logging is not needed.
+// This will prevent spdlog from copying the logger name  on each log call.
+// #define SPDLOG_NO_NAME
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment to enable the SPDLOG_DEBUG/SPDLOG_TRACE macros.
+// #define SPDLOG_DEBUG_ON
+// #define SPDLOG_TRACE_ON
+///////////////////////////////////////////////////////////////////////////////
+
+
+///////////////////////////////////////////////////////////////////////////////
+// Uncomment to avoid locking in the registry operations (spdlog::get(), 
spdlog::drop() spdlog::register()).
+// Use only if your code never modifes concurrently the registry.
+// Note that upon creating a logger the registry is modified by spdlog..
+// #define SPDLOG_NO_REGISTRY_MUTEX
+///////////////////////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/main/MiNiFiMain.cpp
----------------------------------------------------------------------
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
new file mode 100644
index 0000000..40e5930
--- /dev/null
+++ b/main/MiNiFiMain.cpp
@@ -0,0 +1,33 @@
+/**
+ * @file MiNiFiMain.cpp 
+ * MiNiFiMain implementation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "FlowFileRecord.h"
+#include "Logger.h"
+
+int main(int argc, char **argv)
+{
+       Logger *logger = Logger::getLogger();
+
+       logger->log_info("MiNiFi started");
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/Connection.cpp
----------------------------------------------------------------------
diff --git a/src/Connection.cpp b/src/Connection.cpp
new file mode 100644
index 0000000..2befdfd
--- /dev/null
+++ b/src/Connection.cpp
@@ -0,0 +1,146 @@
+/**
+ * @file Connection.cpp
+ * Connection class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "Connection.h"
+
+Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t 
destUUID)
+: _name(name)
+{
+       if (!uuid)
+               // Generate the global UUID for the flow record
+               uuid_generate(_uuid);
+       else
+               uuid_copy(_uuid, uuid);
+
+       if (srcUUID)
+               uuid_copy(_srcUUID, srcUUID);
+       if (destUUID)
+               uuid_copy(_destUUID, destUUID);
+
+       _srcProcessor = NULL;
+       _destProcessor = NULL;
+       _maxQueueSize = 0;
+       _maxQueueDataSize = 0;
+       _expiredDuration = 0;
+       _queuedDataSize = 0;
+
+       _logger = Logger::getLogger();
+
+       _logger->log_info("Connection %s created", _name.c_str());
+}
+
+bool Connection::isEmpty()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       return _queue.empty();
+}
+
+bool Connection::isFull()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0)
+               // No back pressure setting
+               return false;
+
+       if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize)
+               return true;
+
+       if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize)
+               return true;
+
+       return false;
+}
+
+void Connection::put(FlowFileRecord *flow)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _queue.push(flow);
+
+       _queuedDataSize += flow->getSize();
+
+       _logger->log_debug("Enqueue flow file UUID %s to connection %s",
+                       flow->getUUIDStr().c_str(), _name.c_str());
+}
+
+FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> 
&expiredFlowRecords)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       while (!_queue.empty())
+       {
+               FlowFileRecord *item = _queue.front();
+               _queue.pop();
+               _queuedDataSize -= item->getSize();
+
+               if (_expiredDuration > 0)
+               {
+                       // We need to check for flow expiration
+                       if (getTimeMillis() > (item->getEntryDate() + 
_expiredDuration))
+                       {
+                               // Flow record expired
+                               expiredFlowRecords.insert(item);
+                       }
+                       else
+                       {
+                               // Flow record not expired
+                               if (item->isPenalized())
+                               {
+                                       // Flow record was penalized
+                                       _queue.push(item);
+                                       _queuedDataSize += item->getSize();
+                                       break;
+                               }
+                               item->setOriginalConnection(this);
+                               _logger->log_debug("Dequeue flow file UUID %s 
from connection %s",
+                                               item->getUUIDStr().c_str(), 
_name.c_str());
+                               return item;
+                       }
+               }
+               else
+               {
+                       // Flow record not expired
+                       if (item->isPenalized())
+                       {
+                               // Flow record was penalized
+                               _queue.push(item);
+                               _queuedDataSize += item->getSize();
+                               break;
+                       }
+                       item->setOriginalConnection(this);
+                       _logger->log_debug("Dequeue flow file UUID %s from 
connection %s",
+                                       item->getUUIDStr().c_str(), 
_name.c_str());
+                       return item;
+               }
+       }
+
+       return NULL;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp
new file mode 100644
index 0000000..62c59ad
--- /dev/null
+++ b/src/FlowFileRecord.cpp
@@ -0,0 +1,210 @@
+/**
+ * @file FlowFileRecord.cpp
+ * Flow file record class implementation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <sys/time.h>
+#include <time.h>
+
+#include "FlowFileRecord.h"
+#include "Relationship.h"
+#include "Logger.h"
+
+std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0);
+
+FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, 
ResourceClaim *claim)
+: _size(0),
+  _id(_localFlowSeqNumber.load()),
+  _offset(0),
+  _penaltyExpirationMs(0),
+  _claim(claim),
+  _markedDelete(false),
+  _connection(NULL),
+  _orginalConnection(NULL)
+{
+       _entryDate = getTimeMillis();
+       _lineageStartDate = _entryDate;
+
+       char uuidStr[37];
+
+       // Generate the global UUID for the flow record
+       uuid_generate(_uuid);
+       // Increase the local ID for the flow record
+       ++_localFlowSeqNumber;
+       uuid_parse(uuidStr, _uuid);
+       _uuidStr = uuidStr;
+
+       // Populate the default attributes
+    addAttribute(FILENAME, std::to_string(getTimeNano()));
+    addAttribute(PATH, DEFAULT_FLOWFILE_PATH);
+    addAttribute(UUID, uuidStr);
+       // Populate the attributes from the input
+    std::map<std::string, std::string>::iterator it;
+    for (it = attributes.begin(); it!= attributes.end(); it++)
+    {
+       addAttribute(it->first, it->second);
+    }
+
+       if (_claim)
+               // Increase the flow file record owned count for the resource 
claim
+               _claim->increaseFlowFileRecordOwnedCount();
+       _logger = Logger::getLogger();
+}
+
+FlowFileRecord::~FlowFileRecord()
+{
+       _logger->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str());
+       if (_claim)
+               // Decrease the flow file record owned count for the resource 
claim
+               _claim->decreaseFlowFileRecordOwnedCount();
+}
+
+bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return addAttribute(keyString, value);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::addAttribute(std::string key, std::string value)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               // attribute already there in the map
+               return false;
+       }
+       else
+       {
+               _attributes[key] = value;
+               return true;
+       }
+}
+
+bool FlowFileRecord::removeAttribute(FlowAttribute key)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return removeAttribute(keyString);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::removeAttribute(std::string key)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               _attributes.erase(key);
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return updateAttribute(keyString, value);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::updateAttribute(std::string key, std::string value)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               _attributes[key] = value;
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value)
+{
+       const char *keyStr = FlowAttributeKey(key);
+       if (keyStr)
+       {
+               std::string keyString = keyStr;
+               return getAttribute(keyString, value);
+       }
+       else
+       {
+               return false;
+       }
+}
+
+bool FlowFileRecord::getAttribute(std::string key, std::string &value)
+{
+       std::map<std::string, std::string>::iterator it = _attributes.find(key);
+       if (it != _attributes.end())
+       {
+               value = it->second;
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+void FlowFileRecord::duplicate(FlowFileRecord *original)
+{
+       uuid_copy(this->_uuid, original->_uuid);
+       this->_attributes = original->_attributes;
+       this->_entryDate = original->_entryDate;
+       this->_id = original->_id;
+       this->_lastQueueDate = original->_lastQueueDate;
+       this->_lineageStartDate = original->_lineageStartDate;
+       this->_offset = original->_offset;
+       this->_penaltyExpirationMs = original->_penaltyExpirationMs;
+       this->_size = original->_size;
+       this->_lineageIdentifiers = original->_lineageIdentifiers;
+       this->_orginalConnection = original->_orginalConnection;
+
+       this->_claim = original->_claim;
+       if (this->_claim)
+               this->_claim->increaseFlowFileRecordOwnedCount();
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/Logger.cpp
----------------------------------------------------------------------
diff --git a/src/Logger.cpp b/src/Logger.cpp
new file mode 100644
index 0000000..984f609
--- /dev/null
+++ b/src/Logger.cpp
@@ -0,0 +1,27 @@
+/**
+ * @file Logger.cpp
+ * Logger class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "Logger.h"
+
+Logger *Logger::_logger(NULL);
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp
new file mode 100644
index 0000000..3cd7ba5
--- /dev/null
+++ b/src/ProcessSession.cpp
@@ -0,0 +1,526 @@
+/**
+ * @file ProcessSession.cpp
+ * ProcessSession class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <iostream>
+
+#include "ProcessSession.h"
+
+FlowFileRecord* ProcessSession::create()
+{
+       std::map<std::string, std::string> empty;
+       FlowFileRecord *record = new FlowFileRecord(empty);
+
+       if (record)
+       {
+               _addedFlowFiles[record->getUUIDStr()] = record;
+               _logger->log_debug("Create FlowFile with UUID %s", 
record->getUUIDStr().c_str());
+       }
+
+       return record;
+}
+
+FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
+{
+       FlowFileRecord *record = this->create();
+       if (record)
+       {
+               // Copy attributes
+               std::map<std::string, std::string> parentAttributes = 
parent->getAttributes();
+           std::map<std::string, std::string>::iterator it;
+           for (it = parentAttributes.begin(); it!= parentAttributes.end(); 
it++)
+           {
+               if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
+                               it->first == FlowAttributeKey(DISCARD_REASON) ||
+                                       it->first == FlowAttributeKey(UUID))
+                       // Do not copy special attributes from parent
+                       continue;
+               record->setAttribute(it->first, it->second);
+           }
+           record->_lineageStartDate = parent->_lineageStartDate;
+           record->_lineageIdentifiers = parent->_lineageIdentifiers;
+           record->_lineageIdentifiers.insert(parent->_uuidStr);
+
+       }
+       return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
+{
+       FlowFileRecord *record = this->create(parent);
+       if (record)
+       {
+               // Copy Resource Claim
+               record->_claim = parent->_claim;
+               if (record->_claim)
+               {
+                       record->_offset = parent->_offset;
+                       record->_size = parent->_size;
+                       record->_claim->increaseFlowFileRecordOwnedCount();
+               }
+       }
+       return record;
+}
+
+FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, 
long size)
+{
+       FlowFileRecord *record = this->create(parent);
+       if (record)
+       {
+               if (parent->_claim)
+               {
+                       if ((offset + size) > parent->_size)
+                       {
+                               // Set offset and size
+                               _logger->log_error("clone offset %d and size %d 
exceed parent size %d",
+                                               offset, size, parent->_size);
+                               // Remove the Add FlowFile for the session
+                               std::map<std::string, FlowFileRecord 
*>::iterator it =
+                                               
this->_addedFlowFiles.find(record->getUUIDStr());
+                               if (it != this->_addedFlowFiles.end())
+                                       
this->_addedFlowFiles.erase(record->getUUIDStr());
+                               delete record;
+                               return NULL;
+                       }
+                       record->_offset = parent->_offset + parent->_offset;
+                       record->_size = size;
+                       // Copy Resource Claim
+                       record->_claim = parent->_claim;
+                       record->_claim->increaseFlowFileRecordOwnedCount();
+               }
+       }
+       return record;
+}
+
+void ProcessSession::remove(FlowFileRecord *flow)
+{
+       flow->_markedDelete = true;
+       _deletedFlowFiles[flow->getUUIDStr()] = flow;
+}
+
+void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, 
std::string value)
+{
+       flow->setAttribute(key, value);
+}
+
+void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
+{
+       flow->removeAttribute(key);
+}
+
+void ProcessSession::penalize(FlowFileRecord *flow)
+{
+       flow->_penaltyExpirationMs = getTimeMillis() + 
this->_processContext->getProcessor()->getPenalizationPeriodMsec();
+}
+
+void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
+{
+       _transferRelationship[flow->getUUIDStr()] = relationship;
+}
+
+void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback callback)
+{
+       ResourceClaim *claim = NULL;
+
+       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+
+       try
+       {
+               std::ofstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
+               if (fs.is_open())
+               {
+                       // Call the callback to write the content
+                       callback(&fs);
+                       if (fs.good() && fs.tellp() >= 0)
+                       {
+                               flow->_size = fs.tellp();
+                               flow->_offset = 0;
+                               if (flow->_claim)
+                               {
+                                       // Remove the old claim
+                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
+                                       flow->_claim = NULL;
+                               }
+                               flow->_claim = claim;
+                               _logger->log_debug("Write offset %d length %d 
into content %s for FlowFile UUID %s",
+                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+                               fs.close();
+                       }
+                       else
+                       {
+                               fs.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
+               }
+       }
+       catch (std::exception &exception)
+       {
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception during process session 
write");
+               throw;
+       }
+}
+
+void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback 
callback)
+{
+       ResourceClaim *claim = NULL;
+
+       if (flow->_claim == NULL)
+       {
+               // No existed claim for append, we need to create new claim
+               return write(flow, callback);
+       }
+
+       claim = flow->_claim;
+
+       try
+       {
+               std::ofstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::app);
+               if (fs.is_open())
+               {
+                       // Call the callback to write the content
+                       std::streampos oldPos = fs.tellp();
+                       callback(&fs);
+                       if (fs.good() && fs.tellp() >= 0)
+                       {
+                               uint64_t appendSize = fs.tellp() - oldPos;
+                               flow->_size += appendSize;
+                               _logger->log_debug("Append offset %d extra 
length %d to new size %d into content %s for FlowFile UUID %s",
+                                               flow->_offset, appendSize, 
flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+                               fs.close();
+                       }
+                       else
+                       {
+                               fs.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Write Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
+               }
+       }
+       catch (std::exception &exception)
+       {
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception during process session 
append");
+               throw;
+       }
+}
+
+void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback callback)
+{
+       try
+       {
+               ResourceClaim *claim = NULL;
+               if (flow->_claim == NULL)
+               {
+                       // No existed claim for read, we throw exception
+                       throw Exception(FILE_OPERATION_EXCEPTION, "No Content 
Claim existed for read");
+               }
+
+               claim = flow->_claim;
+               std::ifstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::in | 
std::fstream::binary);
+               if (fs.is_open())
+               {
+                       fs.seekg(flow->_offset, fs.beg);
+
+                       if (fs.good())
+                       {
+                               callback(&fs);
+                               _logger->log_debug("Read offset %d size %d 
content %s for FlowFile UUID %s",
+                                               flow->_offset, flow->_size, 
claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+                               fs.close();
+                       }
+                       else
+                       {
+                               fs.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Read Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Open 
Error");
+               }
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process session 
read");
+               throw;
+       }
+}
+
+void ProcessSession::commit()
+{
+       try
+       {
+               // First we clone the flow record based on the transfered 
relationship for updated flow record
+               std::map<std::string, FlowFileRecord *>::iterator it;
+               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                               continue;
+                       std::map<std::string, Relationship>::iterator 
itRelationship =
+                                       
this->_transferRelationship.find(record->getUUIDStr());
+                       if (itRelationship != _transferRelationship.end())
+                       {
+                               Relationship relationship = 
itRelationship->second;
+                               // Find the relationship, we need to find the 
connections for that relationship
+                               std::set<Connection *> connections =
+                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+                               if (connections.empty())
+                               {
+                                       // No connection
+                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
+                                       {
+                                               // Not autoterminate, we should 
have the connect
+                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Connect empty for non auto terminated 
relationship");
+                                       }
+                                       else
+                                       {
+                                               // Autoterminated
+                                               record->_markedDelete = true;
+                                       }
+                               }
+                               else
+                               {
+                                       // We connections, clone the flow and 
assign the connection accordingly
+                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
+                                       {
+                                               Connection 
*connection(*itConnection);
+                                               if (itConnection == 
connections.begin())
+                                               {
+                                                       // First connection 
which the flow need be routed to
+                                                       record->_connection = 
connection;
+                                               }
+                                               else
+                                               {
+                                                       // Clone the flow file 
and route to the connection
+                                                       FlowFileRecord 
*cloneFlow = clone(record);
+                                                       cloneFlow->_connection 
= connection;
+                                                       
this->_clonedFlowFiles[cloneFlow->getUUIDStr()] = cloneFlow;
+                                               }
+                                       }
+                               }
+                       }
+                       else
+                       {
+                               // Can not find relationship for the flow
+                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
+                       }
+               }
+
+               // Do the samething for added flow file
+               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                               continue;
+                       std::map<std::string, Relationship>::iterator 
itRelationship =
+                                       
this->_transferRelationship.find(record->getUUIDStr());
+                       if (itRelationship != _transferRelationship.end())
+                       {
+                               Relationship relationship = 
itRelationship->second;
+                               // Find the relationship, we need to find the 
connections for that relationship
+                               std::set<Connection *> connections =
+                                               
_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
+                               if (connections.empty())
+                               {
+                                       // No connection
+                                       if 
(!_processContext->getProcessor()->isAutoTerminated(relationship))
+                                       {
+                                               // Not autoterminate, we should 
have the connect
+                                               throw 
Exception(PROCESS_SESSION_EXCEPTION, "Connect empty for non auto terminated 
relationship");
+                                       }
+                                       else
+                                       {
+                                               // Autoterminated
+                                               record->_markedDelete = true;
+                                       }
+                               }
+                               else
+                               {
+                                       // We connections, clone the flow and 
assign the connection accordingly
+                                       for (std::set<Connection *>::iterator 
itConnection = connections.begin(); itConnection != connections.end(); 
++itConnection)
+                                       {
+                                               Connection 
*connection(*itConnection);
+                                               if (itConnection == 
connections.begin())
+                                               {
+                                                       // First connection 
which the flow need be routed to
+                                                       record->_connection = 
connection;
+                                               }
+                                               else
+                                               {
+                                                       // Clone the flow file 
and route to the connection
+                                                       FlowFileRecord 
*cloneFlow = clone(record);
+                                                       cloneFlow->_connection 
= connection;
+                                                       
this->_clonedFlowFiles[cloneFlow->getUUIDStr()] = cloneFlow;
+                                               }
+                                       }
+                               }
+                       }
+                       else
+                       {
+                               // Can not find relationship for the flow
+                               throw Exception(PROCESS_SESSION_EXCEPTION, "Can 
not find the transfer relationship for the flow");
+                       }
+               }
+
+               // Complete process the added and update flow files for the 
session, send the flow file to its queue
+               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                       {
+                               delete record;
+                               continue;
+                       }
+                       if (record->_connection)
+                               record->_connection->put(record);
+               }
+               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                       {
+                               delete record;
+                               continue;
+                       }
+                       if (record->_connection)
+                               record->_connection->put(record);
+               }
+               // Process the clone flow files
+               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_markedDelete)
+                       {
+                               delete record;
+                               continue;
+                       }
+                       if (record->_connection)
+                               record->_connection->put(record);
+               }
+               // Delete the deleted flow files
+               for (it = _deletedFlowFiles.begin(); it!= 
_deletedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               // Delete the snapshot
+               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               _logger->log_trace("ProcessSession committed for %s", 
_processContext->getProcessor()->getName().c_str());
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process session 
commit");
+               throw;
+       }
+}
+
+
+void ProcessSession::rollback()
+{
+       try
+       {
+               std::map<std::string, FlowFileRecord *>::iterator it;
+               // Requeue the snapshot of the flowfile back
+               for (it = _originalFlowFiles.begin(); it!= 
_originalFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       if (record->_orginalConnection)
+                               record->_orginalConnection->put(record);
+                       else
+                               delete record;
+               }
+               // Process the clone flow files
+               for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
+               {
+                       FlowFileRecord *record = it->second;
+                       delete record;
+               }
+               _logger->log_trace("ProcessSession rollback for %s", 
_processContext->getProcessor()->getName().c_str());
+       }
+       catch (std::exception &exception)
+       {
+               _logger->log_debug("Caught Exception %s", exception.what());
+               throw;
+       }
+       catch (...)
+       {
+               _logger->log_debug("Caught Exception during process session 
roll back");
+               throw;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/src/Processor.cpp b/src/Processor.cpp
new file mode 100644
index 0000000..b21a1e3
--- /dev/null
+++ b/src/Processor.cpp
@@ -0,0 +1,363 @@
+/**
+ * @file Processor.cpp
+ * Processor class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+
+#include "Processor.h"
+#include "ProcessContext.h"
+
+Processor::Processor(std::string name, uuid_t uuid)
+: _name(name)
+{
+       if (!uuid)
+               // Generate the global UUID for the flow record
+               uuid_generate(_uuid);
+       else
+               uuid_copy(_uuid, uuid);
+
+       // Setup the default values
+       _state = DISABLED;
+       _strategy = TIMER_DRIVEN;
+       _lossTolerant = false;
+       _triggerWhenEmpty = false;
+       _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
+       _runDurantionNano = 0;
+       _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
+       _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
+       _maxConcurrentTasks = 1;
+       _activeTasks = 0;
+       _yieldExpiration = 0;
+       _logger = Logger::getLogger();
+
+       _logger->log_info("Processor %s created", _name.c_str());
+}
+
+Processor::~Processor()
+{
+
+}
+
+bool Processor::isRunning()
+{
+       return (_state == RUNNING);
+}
+
+bool Processor::setSupportedProperties(std::set<Property> properties)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not set processor property while the 
process %s is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _properties.clear();
+       for (std::set<Property>::iterator it = properties.begin(); it != 
properties.end(); ++it)
+       {
+               Property item(*it);
+               _properties[item.getName()] = item;
+       }
+
+       return true;
+}
+
+bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not set processor supported relationship 
while the process %s is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _relationships.clear();
+       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
+       {
+               Relationship item(*it);
+               _relationships[item.getName()] = item;
+       }
+
+       return true;
+}
+
+bool Processor::setAutoTerminatedRelationships(std::set<Relationship> 
relationships)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not set processor auto terminated 
relationship while the process %s is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _autoTerminatedRelationships.clear();
+       for (std::set<Relationship>::iterator it = relationships.begin(); it != 
relationships.end(); ++it)
+       {
+               Relationship item(*it);
+               _autoTerminatedRelationships[item.getName()] = item;
+       }
+
+       return true;
+}
+
+bool Processor::isAutoTerminated(Relationship relationship)
+{
+       bool isRun = isRunning();
+
+       if (!isRun)
+               _mtx.lock();
+
+       std::map<std::string, Relationship>::iterator it = 
_autoTerminatedRelationships.find(relationship.getName());
+       if (it != _autoTerminatedRelationships.end())
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return true;
+       }
+       else
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return false;
+       }
+}
+
+bool Processor::isSupportedRelationship(Relationship relationship)
+{
+       bool isRun = isRunning();
+
+       if (!isRun)
+               _mtx.lock();
+
+       std::map<std::string, Relationship>::iterator it = 
_relationships.find(relationship.getName());
+       if (it != _relationships.end())
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return true;
+       }
+       else
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return false;
+       }
+}
+
+bool Processor::getProperty(std::string name, std::string &value)
+{
+       bool isRun = isRunning();
+
+       if (!isRun)
+               // Because set property only allowed in non running state, we 
need to obtain lock avoid rack condition
+               _mtx.lock();
+
+       std::map<std::string, Property>::iterator it = _properties.find(name);
+       if (it != _properties.end())
+       {
+               Property item = it->second;
+               value = item.getValue();
+               if (!isRun)
+                       _mtx.unlock();
+               return true;
+       }
+       else
+       {
+               if (!isRun)
+                       _mtx.unlock();
+               return false;
+       }
+}
+
+bool Processor::setProperty(std::string name, std::string value)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not set processor property while the 
process %s is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       std::map<std::string, Property>::iterator it = _properties.find(name);
+       if (it != _properties.end())
+       {
+               Property item = it->second;
+               _properties[item.getName()] = item;
+               return true;
+       }
+       else
+       {
+               return false;
+       }
+}
+
+void Processor::yield()
+{
+       _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
+}
+
+std::set<Connection *> Processor::getOutGoingConnections(std::string 
relationship)
+{
+       std::set<Connection *> empty;
+
+       std::map<std::string, std::set<Connection *>>::iterator it = 
_outGoingConnections.find(relationship);
+       if (it != _outGoingConnections.end())
+       {
+               return _outGoingConnections[relationship];
+       }
+       else
+       {
+               return empty;
+       }
+}
+
+bool Processor::addConnection(Connection *connection)
+{
+       bool ret = false;
+
+       if (isRunning())
+       {
+               _logger->log_info("Can not add connection while the process %s 
is running",
+                               _name.c_str());
+               return false;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       uuid_t srcUUID;
+       uuid_t destUUID;
+
+       connection->getSourceProcessorUUID(srcUUID);
+       connection->getDestinationProcessorUUID(destUUID);
+
+       if (uuid_compare(_uuid, destUUID) == 0)
+       {
+               // Connection is destination to the current processor
+               if (_incomingConnections.find(connection) == 
_incomingConnections.end())
+               {
+                       _incomingConnections.insert(connection);
+                       connection->setDestinationProcessor(this);
+                       _logger->log_info("Add connection %s into Processor %s 
incoming connection",
+                                       connection->getName().c_str(), 
_name.c_str());
+                       ret = true;
+               }
+       }
+
+       if (uuid_compare(_uuid, srcUUID) == 0)
+       {
+               std::string relationship = 
connection->getRelationship().getName();
+               // Connection is source from the current processor
+               std::map<std::string, std::set<Connection *>>::iterator it =
+                               _outGoingConnections.find(relationship);
+               if (it != _outGoingConnections.end())
+               {
+                       // We already has connection for this relationship
+                       std::set<Connection *> existedConnection = it->second;
+                       if (existedConnection.find(connection) == 
existedConnection.end())
+                       {
+                               // We do not have the same connection for this 
relationship yet
+                               existedConnection.insert(connection);
+                               connection->setSourceProcessor(this);
+                               _logger->log_info("Add connection %s into 
Processor %s outgoing connection for relationship %s",
+                                                                               
                connection->getName().c_str(), _name.c_str(), 
relationship.c_str());
+                               ret = true;
+                       }
+               }
+               else
+               {
+                       // We do not have any outgoing connection for this 
relationship yet
+                       std::set<Connection *> newConnection;
+                       newConnection.insert(connection);
+                       connection->setSourceProcessor(this);
+                       _outGoingConnections[relationship] = newConnection;
+                       _logger->log_info("Add connection %s into Processor %s 
outgoing connection for relationship %s",
+                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
+                       ret = true;
+               }
+       }
+
+       return ret;
+}
+
+void Processor::removeConnection(Connection *connection)
+{
+       if (isRunning())
+       {
+               _logger->log_info("Can not remove connection while the process 
%s is running",
+                               _name.c_str());
+               return;
+       }
+
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       uuid_t srcUUID;
+       uuid_t destUUID;
+
+       connection->getSourceProcessorUUID(srcUUID);
+       connection->getDestinationProcessorUUID(destUUID);
+
+       if (uuid_compare(_uuid, destUUID) == 0)
+       {
+               // Connection is destination to the current processor
+               if (_incomingConnections.find(connection) != 
_incomingConnections.end())
+               {
+                       _incomingConnections.erase(connection);
+                       connection->setDestinationProcessor(NULL);
+                       _logger->log_info("Remove connection %s into Processor 
%s incoming connection",
+                                       connection->getName().c_str(), 
_name.c_str());
+               }
+       }
+
+       if (uuid_compare(_uuid, srcUUID) == 0)
+       {
+               std::string relationship = 
connection->getRelationship().getName();
+               // Connection is source from the current processor
+               std::map<std::string, std::set<Connection *>>::iterator it =
+                               _outGoingConnections.find(relationship);
+               if (it == _outGoingConnections.end())
+               {
+                       return;
+               }
+               else
+               {
+                       if (_outGoingConnections[relationship].find(connection) 
!= _outGoingConnections[relationship].end())
+                       {
+                               
_outGoingConnections[relationship].erase(connection);
+                               connection->setSourceProcessor(NULL);
+                               _logger->log_info("Remove connection %s into 
Processor %s outgoing connection for relationship %s",
+                                                               
connection->getName().c_str(), _name.c_str(), relationship.c_str());
+                       }
+               }
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/ResourceClaim.cpp
----------------------------------------------------------------------
diff --git a/src/ResourceClaim.cpp b/src/ResourceClaim.cpp
new file mode 100644
index 0000000..5e392e9
--- /dev/null
+++ b/src/ResourceClaim.cpp
@@ -0,0 +1,41 @@
+/**
+ * @file ResourceClaim.cpp
+ * ResourceClaim class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "ResourceClaim.h"
+
+std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0);
+
+ResourceClaim::ResourceClaim(const std::string contentDirectory)
+: _id(_localResourceClaimNumber.load()),
+  _flowFileRecordOwnedCount(0)
+{
+       char uuidStr[37];
+
+       // Generate the global UUID for the resource claim
+       uuid_generate(_uuid);
+       // Increase the local ID for the resource claim
+       ++_localResourceClaimNumber;
+       uuid_parse(uuidStr, _uuid);
+       // Create the full content path for the content
+       _contentFullPath = contentDirectory + "/" + uuidStr;
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/test/FlowFileRecordTest.cpp
----------------------------------------------------------------------
diff --git a/test/FlowFileRecordTest.cpp b/test/FlowFileRecordTest.cpp
new file mode 100644
index 0000000..09a3d33
--- /dev/null
+++ b/test/FlowFileRecordTest.cpp
@@ -0,0 +1,28 @@
+/**
+ * @file MiNiFiMain.cpp 
+ * MiNiFiMain implementation 
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+
+#include "FlowFileRecord.h"
+
+int main(int argc, char **argv)
+{
+}

Reply via email to