Repository: nifi-minifi-cpp Updated Branches: refs/heads/MINIFI-6 c3444a6b7 -> 31c9a1b61 (forced update)
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/null_sink.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/sinks/null_sink.h b/inc/spdlog/sinks/null_sink.h new file mode 100644 index 0000000..992b3b7 --- /dev/null +++ b/inc/spdlog/sinks/null_sink.h @@ -0,0 +1,52 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once +#include <mutex> +#include "./base_sink.h" +#include "../details/null_mutex.h" + + +namespace spdlog +{ +namespace sinks +{ + +template <class Mutex> +class null_sink : public base_sink < Mutex > +{ +protected: + void _sink_it(const details::log_msg&) override + {} + + void flush() override + {} + +}; +typedef null_sink<details::null_mutex> null_sink_st; +typedef null_sink<std::mutex> null_sink_mt; + +} +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/ostream_sink.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/sinks/ostream_sink.h b/inc/spdlog/sinks/ostream_sink.h new file mode 100644 index 0000000..f2fe3b2 --- /dev/null +++ b/inc/spdlog/sinks/ostream_sink.h @@ -0,0 +1,67 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#include <ostream> +#include <mutex> +#include <memory> + +#include "../details/null_mutex.h" +#include "./base_sink.h" + +namespace spdlog +{ +namespace sinks +{ +template<class Mutex> +class ostream_sink: public base_sink<Mutex> +{ +public: + explicit ostream_sink(std::ostream& os, bool force_flush=false) :_ostream(os), _force_flush(force_flush) {} + ostream_sink(const ostream_sink&) = delete; + ostream_sink& operator=(const ostream_sink&) = delete; + virtual ~ostream_sink() = default; + +protected: + void _sink_it(const details::log_msg& msg) override + { + _ostream.write(msg.formatted.data(), msg.formatted.size()); + if (_force_flush) + _ostream.flush(); + } + + void flush() override + { + _ostream.flush(); + } + + std::ostream& _ostream; + bool _force_flush; +}; + +typedef ostream_sink<std::mutex> ostream_sink_mt; +typedef ostream_sink<details::null_mutex> ostream_sink_st; +} +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/sink.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/sinks/sink.h b/inc/spdlog/sinks/sink.h new file mode 100644 index 0000000..88c423a --- /dev/null +++ b/inc/spdlog/sinks/sink.h @@ -0,0 +1,42 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#include "../details/log_msg.h" + +namespace spdlog +{ +namespace sinks +{ +class sink +{ +public: + virtual ~sink() {} + virtual void log(const details::log_msg& msg) = 0; + virtual void flush() = 0; +}; +} +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/stdout_sinks.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/sinks/stdout_sinks.h b/inc/spdlog/sinks/stdout_sinks.h new file mode 100644 index 0000000..5ad06c2 --- /dev/null +++ b/inc/spdlog/sinks/stdout_sinks.h @@ -0,0 +1,71 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#include <iostream> +#include <mutex> +#include "./ostream_sink.h" +#include "../details/null_mutex.h" + +namespace spdlog +{ +namespace sinks +{ + +template <class Mutex> +class stdout_sink : public ostream_sink<Mutex> +{ + using MyType = stdout_sink<Mutex>; +public: + stdout_sink() : ostream_sink<Mutex>(std::cout, true) {} + static std::shared_ptr<MyType> instance() + { + static std::shared_ptr<MyType> instance = std::make_shared<MyType>(); + return instance; + } +}; + +typedef stdout_sink<details::null_mutex> stdout_sink_st; +typedef stdout_sink<std::mutex> stdout_sink_mt; + + +template <class Mutex> +class stderr_sink : public ostream_sink<Mutex> +{ + using MyType = stderr_sink<Mutex>; +public: + stderr_sink() : ostream_sink<Mutex>(std::cerr, true) {} + static std::shared_ptr<MyType> instance() + { + static std::shared_ptr<MyType> instance = std::make_shared<MyType>(); + return instance; + } + +}; + +typedef stderr_sink<std::mutex> stderr_sink_mt; +typedef stderr_sink<details::null_mutex> stderr_sink_st; +} +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/sinks/syslog_sink.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/sinks/syslog_sink.h b/inc/spdlog/sinks/syslog_sink.h new file mode 100644 index 0000000..37b6513 --- /dev/null +++ b/inc/spdlog/sinks/syslog_sink.h @@ -0,0 +1,102 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + +#pragma once + +#ifdef __linux__ + +#include <array> +#include <string> +#include <syslog.h> + +#include "./sink.h" +#include "../common.h" +#include "../details/log_msg.h" + + +namespace spdlog +{ +namespace sinks +{ +/** + * Sink that write to syslog using the `syscall()` library call. + * + * Locking is not needed, as `syslog()` itself is thread-safe. + */ +class syslog_sink : public sink +{ +public: + // + syslog_sink(const std::string& ident = "", int syslog_option=0, int syslog_facility=LOG_USER): + _ident(ident) + { + _priorities[static_cast<int>(level::trace)] = LOG_DEBUG; + _priorities[static_cast<int>(level::debug)] = LOG_DEBUG; + _priorities[static_cast<int>(level::info)] = LOG_INFO; + _priorities[static_cast<int>(level::notice)] = LOG_NOTICE; + _priorities[static_cast<int>(level::warn)] = LOG_WARNING; + _priorities[static_cast<int>(level::err)] = LOG_ERR; + _priorities[static_cast<int>(level::critical)] = LOG_CRIT; + _priorities[static_cast<int>(level::alert)] = LOG_ALERT; + _priorities[static_cast<int>(level::emerg)] = LOG_EMERG; + _priorities[static_cast<int>(level::off)] = LOG_INFO; + + //set ident to be program name if empty + ::openlog(_ident.empty()? nullptr:_ident.c_str(), syslog_option, syslog_facility); + } + ~syslog_sink() + { + ::closelog(); + } + + syslog_sink(const syslog_sink&) = delete; + syslog_sink& operator=(const syslog_sink&) = delete; + + void log(const details::log_msg &msg) override + { + ::syslog(syslog_prio_from_level(msg), "%s", msg.formatted.str().c_str()); + } + + void flush() override + { + } + + +private: + std::array<int, 10> _priorities; + //must store the ident because the man says openlog might use the pointer as is and not a string copy + const std::string _ident; + + // + // Simply maps spdlog's log level to syslog priority level. + // + int syslog_prio_from_level(const details::log_msg &msg) const + { + return _priorities[static_cast<int>(msg.level)]; + } +}; +} +} + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/spdlog.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/spdlog.h b/inc/spdlog/spdlog.h new file mode 100644 index 0000000..5cec562 --- /dev/null +++ b/inc/spdlog/spdlog.h @@ -0,0 +1,155 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + + +// spdlog main header file. +//see example.cpp for usage example + +#pragma once + +#include "tweakme.h" +#include "common.h" +#include "logger.h" + +namespace spdlog +{ +// Return an existing logger or nullptr if a logger with such name doesn't exist. +// Examples: +// +// spdlog::get("mylog")->info("Hello"); +// auto logger = spdlog::get("mylog"); +// logger.info("This is another message" , x, y, z); +// logger.info() << "This is another message" << x << y << z; +std::shared_ptr<logger> get(const std::string& name); + +// +// Set global formatting +// example: spdlog::set_pattern("%Y-%m-%d %H:%M:%S.%e %l : %v"); +// +void set_pattern(const std::string& format_string); +void set_formatter(formatter_ptr f); + +// +// Set global logging level for +// +void set_level(level::level_enum log_level); + +// +// Turn on async mode (off by default) and set the queue size for each async_logger. +// effective only for loggers created after this call. +// queue_size: size of queue (must be power of 2): +// Each logger will pre-allocate a dedicated queue with queue_size entries upon construction. +// +// async_overflow_policy (optional, block_retry by default): +// async_overflow_policy::block_retry - if queue is full, block until queue has room for the new log entry. +// async_overflow_policy::discard_log_msg - never block and discard any new messages when queue overflows. +// +// worker_warmup_cb (optional): +// callback function that will be called in worker thread upon start (can be used to init stuff like thread affinity) +// +void set_async_mode(size_t queue_size, const async_overflow_policy overflow_policy = async_overflow_policy::block_retry, const std::function<void()>& worker_warmup_cb = nullptr, const std::chrono::milliseconds& flush_interval_ms = std::chrono::milliseconds::zero()); + +// Turn off async mode +void set_sync_mode(); + +// +// Create and register multi/single threaded rotating file logger +// +std::shared_ptr<logger> rotating_logger_mt(const std::string& logger_name, const std::string& filename, size_t max_file_size, size_t max_files, bool force_flush = false); +std::shared_ptr<logger> rotating_logger_st(const std::string& logger_name, const std::string& filename, size_t max_file_size, size_t max_files, bool force_flush = false); + +// +// Create file logger which creates new file on the given time (default in midnight): +// +std::shared_ptr<logger> daily_logger_mt(const std::string& logger_name, const std::string& filename, int hour=0, int minute=0, bool force_flush = false); +std::shared_ptr<logger> daily_logger_st(const std::string& logger_name, const std::string& filename, int hour=0, int minute=0, bool force_flush = false); + + +// +// Create and register stdout/stderr loggers +// +std::shared_ptr<logger> stdout_logger_mt(const std::string& logger_name); +std::shared_ptr<logger> stdout_logger_st(const std::string& logger_name); +std::shared_ptr<logger> stderr_logger_mt(const std::string& logger_name); +std::shared_ptr<logger> stderr_logger_st(const std::string& logger_name); + + +// +// Create and register a syslog logger +// +#ifdef __linux__ +std::shared_ptr<logger> syslog_logger(const std::string& logger_name, const std::string& ident = "", int syslog_option = 0); +#endif + + +// Create and register a logger with multiple sinks +std::shared_ptr<logger> create(const std::string& logger_name, sinks_init_list sinks); +template<class It> +std::shared_ptr<logger> create(const std::string& logger_name, const It& sinks_begin, const It& sinks_end); + + +// Create and register a logger with templated sink type +// Example: spdlog::create<daily_file_sink_st>("mylog", "dailylog_filename", "txt"); +template <typename Sink, typename... Args> +std::shared_ptr<spdlog::logger> create(const std::string& logger_name, const Args&...); + + +// Register the given logger with the given name +void register_logger(std::shared_ptr<logger> logger); + +// Drop the reference to the given logger +void drop(const std::string &name); + +// Drop all references +void drop_all(); + + +/////////////////////////////////////////////////////////////////////////////// +// +// Macros to be display source file & line +// Trace & Debug can be switched on/off at compile time for zero cost debug statements. +// Uncomment SPDLOG_DEBUG_ON/SPDLOG_TRACE_ON in teakme.h to enable. +// +// Example: +// spdlog::set_level(spdlog::level::debug); +// SPDLOG_DEBUG(my_logger, "Some debug message {} {}", 1, 3.2); +/////////////////////////////////////////////////////////////////////////////// + +#ifdef SPDLOG_TRACE_ON +#define SPDLOG_TRACE(logger, ...) logger->trace(__VA_ARGS__) << " (" << __FILE__ << " #" << __LINE__ <<")"; +#else +#define SPDLOG_TRACE(logger, ...) +#endif + +#ifdef SPDLOG_DEBUG_ON +#define SPDLOG_DEBUG(logger, ...) logger->debug(__VA_ARGS__) << " (" << __FILE__ << " #" << __LINE__ <<")"; +#else +#define SPDLOG_DEBUG(logger, ...) +#endif + + +} + + +#include "details/spdlog_impl.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/inc/spdlog/tweakme.h ---------------------------------------------------------------------- diff --git a/inc/spdlog/tweakme.h b/inc/spdlog/tweakme.h new file mode 100644 index 0000000..b651658 --- /dev/null +++ b/inc/spdlog/tweakme.h @@ -0,0 +1,74 @@ +/*************************************************************************/ +/* spdlog - an extremely fast and easy to use c++11 logging library. */ +/* Copyright (c) 2014 Gabi Melman. */ +/* */ +/* Permission is hereby granted, free of charge, to any person obtaining */ +/* a copy of this software and associated documentation files (the */ +/* "Software"), to deal in the Software without restriction, including */ +/* without limitation the rights to use, copy, modify, merge, publish, */ +/* distribute, sublicense, and/or sell copies of the Software, and to */ +/* permit persons to whom the Software is furnished to do so, subject to */ +/* the following conditions: */ +/* */ +/* The above copyright notice and this permission notice shall be */ +/* included in all copies or substantial portions of the Software. */ +/* */ +/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */ +/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */ +/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.*/ +/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */ +/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */ +/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */ +/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +/*************************************************************************/ + + +#pragma once + +/////////////////////////////////////////////////////////////////////////////// +// Edit this file to squeeze every last drop of performance out of spdlog. +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Under Linux, the much faster CLOCK_REALTIME_COARSE clock can be used. +// This clock is less accurate - can be off by dozens of millis - depending on the kernel HZ. +// Uncomment to use it instead of the regular (but slower) clock. +// #define SPDLOG_CLOCK_COARSE +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment if date/time logging is not needed. +// This will prevent spdlog from quering the clock on each log call. +// #define SPDLOG_NO_DATETIME +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment if thread id logging is not needed (i.e. no %t in the log pattern). +// This will prevent spdlog from quering the thread id on each log call. +// #define SPDLOG_NO_THREAD_ID +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment if logger name logging is not needed. +// This will prevent spdlog from copying the logger name on each log call. +// #define SPDLOG_NO_NAME +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment to enable the SPDLOG_DEBUG/SPDLOG_TRACE macros. +// #define SPDLOG_DEBUG_ON +// #define SPDLOG_TRACE_ON +/////////////////////////////////////////////////////////////////////////////// + + +/////////////////////////////////////////////////////////////////////////////// +// Uncomment to avoid locking in the registry operations (spdlog::get(), spdlog::drop() spdlog::register()). +// Use only if your code never modifes concurrently the registry. +// Note that upon creating a logger the registry is modified by spdlog.. +// #define SPDLOG_NO_REGISTRY_MUTEX +/////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/main/MiNiFiMain.cpp ---------------------------------------------------------------------- diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp new file mode 100644 index 0000000..40e5930 --- /dev/null +++ b/main/MiNiFiMain.cpp @@ -0,0 +1,33 @@ +/** + * @file MiNiFiMain.cpp + * MiNiFiMain implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> + +#include "FlowFileRecord.h" +#include "Logger.h" + +int main(int argc, char **argv) +{ + Logger *logger = Logger::getLogger(); + + logger->log_info("MiNiFi started"); + +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/src/Connection.cpp b/src/Connection.cpp new file mode 100644 index 0000000..2befdfd --- /dev/null +++ b/src/Connection.cpp @@ -0,0 +1,146 @@ +/** + * @file Connection.cpp + * Connection class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <iostream> + +#include "Connection.h" + +Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID) +: _name(name) +{ + if (!uuid) + // Generate the global UUID for the flow record + uuid_generate(_uuid); + else + uuid_copy(_uuid, uuid); + + if (srcUUID) + uuid_copy(_srcUUID, srcUUID); + if (destUUID) + uuid_copy(_destUUID, destUUID); + + _srcProcessor = NULL; + _destProcessor = NULL; + _maxQueueSize = 0; + _maxQueueDataSize = 0; + _expiredDuration = 0; + _queuedDataSize = 0; + + _logger = Logger::getLogger(); + + _logger->log_info("Connection %s created", _name.c_str()); +} + +bool Connection::isEmpty() +{ + std::lock_guard<std::mutex> lock(_mtx); + + return _queue.empty(); +} + +bool Connection::isFull() +{ + std::lock_guard<std::mutex> lock(_mtx); + + if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0) + // No back pressure setting + return false; + + if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize) + return true; + + if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize) + return true; + + return false; +} + +void Connection::put(FlowFileRecord *flow) +{ + std::lock_guard<std::mutex> lock(_mtx); + + _queue.push(flow); + + _queuedDataSize += flow->getSize(); + + _logger->log_debug("Enqueue flow file UUID %s to connection %s", + flow->getUUIDStr().c_str(), _name.c_str()); +} + +FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords) +{ + std::lock_guard<std::mutex> lock(_mtx); + + while (!_queue.empty()) + { + FlowFileRecord *item = _queue.front(); + _queue.pop(); + _queuedDataSize -= item->getSize(); + + if (_expiredDuration > 0) + { + // We need to check for flow expiration + if (getTimeMillis() > (item->getEntryDate() + _expiredDuration)) + { + // Flow record expired + expiredFlowRecords.insert(item); + } + else + { + // Flow record not expired + if (item->isPenalized()) + { + // Flow record was penalized + _queue.push(item); + _queuedDataSize += item->getSize(); + break; + } + item->setOriginalConnection(this); + _logger->log_debug("Dequeue flow file UUID %s from connection %s", + item->getUUIDStr().c_str(), _name.c_str()); + return item; + } + } + else + { + // Flow record not expired + if (item->isPenalized()) + { + // Flow record was penalized + _queue.push(item); + _queuedDataSize += item->getSize(); + break; + } + item->setOriginalConnection(this); + _logger->log_debug("Dequeue flow file UUID %s from connection %s", + item->getUUIDStr().c_str(), _name.c_str()); + return item; + } + } + + return NULL; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp new file mode 100644 index 0000000..62c59ad --- /dev/null +++ b/src/FlowFileRecord.cpp @@ -0,0 +1,210 @@ +/** + * @file FlowFileRecord.cpp + * Flow file record class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <sys/time.h> +#include <time.h> + +#include "FlowFileRecord.h" +#include "Relationship.h" +#include "Logger.h" + +std::atomic<uint64_t> FlowFileRecord::_localFlowSeqNumber(0); + +FlowFileRecord::FlowFileRecord(std::map<std::string, std::string> attributes, ResourceClaim *claim) +: _size(0), + _id(_localFlowSeqNumber.load()), + _offset(0), + _penaltyExpirationMs(0), + _claim(claim), + _markedDelete(false), + _connection(NULL), + _orginalConnection(NULL) +{ + _entryDate = getTimeMillis(); + _lineageStartDate = _entryDate; + + char uuidStr[37]; + + // Generate the global UUID for the flow record + uuid_generate(_uuid); + // Increase the local ID for the flow record + ++_localFlowSeqNumber; + uuid_parse(uuidStr, _uuid); + _uuidStr = uuidStr; + + // Populate the default attributes + addAttribute(FILENAME, std::to_string(getTimeNano())); + addAttribute(PATH, DEFAULT_FLOWFILE_PATH); + addAttribute(UUID, uuidStr); + // Populate the attributes from the input + std::map<std::string, std::string>::iterator it; + for (it = attributes.begin(); it!= attributes.end(); it++) + { + addAttribute(it->first, it->second); + } + + if (_claim) + // Increase the flow file record owned count for the resource claim + _claim->increaseFlowFileRecordOwnedCount(); + _logger = Logger::getLogger(); +} + +FlowFileRecord::~FlowFileRecord() +{ + _logger->log_debug("Delete FlowFile UUID %s", _uuidStr.c_str()); + if (_claim) + // Decrease the flow file record owned count for the resource claim + _claim->decreaseFlowFileRecordOwnedCount(); +} + +bool FlowFileRecord::addAttribute(FlowAttribute key, std::string value) +{ + const char *keyStr = FlowAttributeKey(key); + if (keyStr) + { + std::string keyString = keyStr; + return addAttribute(keyString, value); + } + else + { + return false; + } +} + +bool FlowFileRecord::addAttribute(std::string key, std::string value) +{ + std::map<std::string, std::string>::iterator it = _attributes.find(key); + if (it != _attributes.end()) + { + // attribute already there in the map + return false; + } + else + { + _attributes[key] = value; + return true; + } +} + +bool FlowFileRecord::removeAttribute(FlowAttribute key) +{ + const char *keyStr = FlowAttributeKey(key); + if (keyStr) + { + std::string keyString = keyStr; + return removeAttribute(keyString); + } + else + { + return false; + } +} + +bool FlowFileRecord::removeAttribute(std::string key) +{ + std::map<std::string, std::string>::iterator it = _attributes.find(key); + if (it != _attributes.end()) + { + _attributes.erase(key); + return true; + } + else + { + return false; + } +} + +bool FlowFileRecord::updateAttribute(FlowAttribute key, std::string value) +{ + const char *keyStr = FlowAttributeKey(key); + if (keyStr) + { + std::string keyString = keyStr; + return updateAttribute(keyString, value); + } + else + { + return false; + } +} + +bool FlowFileRecord::updateAttribute(std::string key, std::string value) +{ + std::map<std::string, std::string>::iterator it = _attributes.find(key); + if (it != _attributes.end()) + { + _attributes[key] = value; + return true; + } + else + { + return false; + } +} + +bool FlowFileRecord::getAttribute(FlowAttribute key, std::string &value) +{ + const char *keyStr = FlowAttributeKey(key); + if (keyStr) + { + std::string keyString = keyStr; + return getAttribute(keyString, value); + } + else + { + return false; + } +} + +bool FlowFileRecord::getAttribute(std::string key, std::string &value) +{ + std::map<std::string, std::string>::iterator it = _attributes.find(key); + if (it != _attributes.end()) + { + value = it->second; + return true; + } + else + { + return false; + } +} + +void FlowFileRecord::duplicate(FlowFileRecord *original) +{ + uuid_copy(this->_uuid, original->_uuid); + this->_attributes = original->_attributes; + this->_entryDate = original->_entryDate; + this->_id = original->_id; + this->_lastQueueDate = original->_lastQueueDate; + this->_lineageStartDate = original->_lineageStartDate; + this->_offset = original->_offset; + this->_penaltyExpirationMs = original->_penaltyExpirationMs; + this->_size = original->_size; + this->_lineageIdentifiers = original->_lineageIdentifiers; + this->_orginalConnection = original->_orginalConnection; + + this->_claim = original->_claim; + if (this->_claim) + this->_claim->increaseFlowFileRecordOwnedCount(); +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/Logger.cpp ---------------------------------------------------------------------- diff --git a/src/Logger.cpp b/src/Logger.cpp new file mode 100644 index 0000000..984f609 --- /dev/null +++ b/src/Logger.cpp @@ -0,0 +1,27 @@ +/** + * @file Logger.cpp + * Logger class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> + +#include "Logger.h" + +Logger *Logger::_logger(NULL); + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp new file mode 100644 index 0000000..3cd7ba5 --- /dev/null +++ b/src/ProcessSession.cpp @@ -0,0 +1,526 @@ +/** + * @file ProcessSession.cpp + * ProcessSession class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <iostream> + +#include "ProcessSession.h" + +FlowFileRecord* ProcessSession::create() +{ + std::map<std::string, std::string> empty; + FlowFileRecord *record = new FlowFileRecord(empty); + + if (record) + { + _addedFlowFiles[record->getUUIDStr()] = record; + _logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); + } + + return record; +} + +FlowFileRecord* ProcessSession::create(FlowFileRecord *parent) +{ + FlowFileRecord *record = this->create(); + if (record) + { + // Copy attributes + std::map<std::string, std::string> parentAttributes = parent->getAttributes(); + std::map<std::string, std::string>::iterator it; + for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++) + { + if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) || + it->first == FlowAttributeKey(DISCARD_REASON) || + it->first == FlowAttributeKey(UUID)) + // Do not copy special attributes from parent + continue; + record->setAttribute(it->first, it->second); + } + record->_lineageStartDate = parent->_lineageStartDate; + record->_lineageIdentifiers = parent->_lineageIdentifiers; + record->_lineageIdentifiers.insert(parent->_uuidStr); + + } + return record; +} + +FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent) +{ + FlowFileRecord *record = this->create(parent); + if (record) + { + // Copy Resource Claim + record->_claim = parent->_claim; + if (record->_claim) + { + record->_offset = parent->_offset; + record->_size = parent->_size; + record->_claim->increaseFlowFileRecordOwnedCount(); + } + } + return record; +} + +FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size) +{ + FlowFileRecord *record = this->create(parent); + if (record) + { + if (parent->_claim) + { + if ((offset + size) > parent->_size) + { + // Set offset and size + _logger->log_error("clone offset %d and size %d exceed parent size %d", + offset, size, parent->_size); + // Remove the Add FlowFile for the session + std::map<std::string, FlowFileRecord *>::iterator it = + this->_addedFlowFiles.find(record->getUUIDStr()); + if (it != this->_addedFlowFiles.end()) + this->_addedFlowFiles.erase(record->getUUIDStr()); + delete record; + return NULL; + } + record->_offset = parent->_offset + parent->_offset; + record->_size = size; + // Copy Resource Claim + record->_claim = parent->_claim; + record->_claim->increaseFlowFileRecordOwnedCount(); + } + } + return record; +} + +void ProcessSession::remove(FlowFileRecord *flow) +{ + flow->_markedDelete = true; + _deletedFlowFiles[flow->getUUIDStr()] = flow; +} + +void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value) +{ + flow->setAttribute(key, value); +} + +void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key) +{ + flow->removeAttribute(key); +} + +void ProcessSession::penalize(FlowFileRecord *flow) +{ + flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec(); +} + +void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship) +{ + _transferRelationship[flow->getUUIDStr()] = relationship; +} + +void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback callback) +{ + ResourceClaim *claim = NULL; + + claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + + try + { + std::ofstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + if (fs.is_open()) + { + // Call the callback to write the content + callback(&fs); + if (fs.good() && fs.tellp() >= 0) + { + flow->_size = fs.tellp(); + flow->_offset = 0; + if (flow->_claim) + { + // Remove the old claim + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + flow->_claim = claim; + _logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + fs.close(); + } + else + { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } + catch (std::exception &exception) + { + if (claim) + delete claim; + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (claim) + delete claim; + _logger->log_debug("Caught Exception during process session write"); + throw; + } +} + +void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback callback) +{ + ResourceClaim *claim = NULL; + + if (flow->_claim == NULL) + { + // No existed claim for append, we need to create new claim + return write(flow, callback); + } + + claim = flow->_claim; + + try + { + std::ofstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app); + if (fs.is_open()) + { + // Call the callback to write the content + std::streampos oldPos = fs.tellp(); + callback(&fs); + if (fs.good() && fs.tellp() >= 0) + { + uint64_t appendSize = fs.tellp() - oldPos; + flow->_size += appendSize; + _logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", + flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + fs.close(); + } + else + { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } + catch (std::exception &exception) + { + if (claim) + delete claim; + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + if (claim) + delete claim; + _logger->log_debug("Caught Exception during process session append"); + throw; + } +} + +void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback callback) +{ + try + { + ResourceClaim *claim = NULL; + if (flow->_claim == NULL) + { + // No existed claim for read, we throw exception + throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read"); + } + + claim = flow->_claim; + std::ifstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary); + if (fs.is_open()) + { + fs.seekg(flow->_offset, fs.beg); + + if (fs.good()) + { + callback(&fs); + _logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", + flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + fs.close(); + } + else + { + fs.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error"); + } + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process session read"); + throw; + } +} + +void ProcessSession::commit() +{ + try + { + // First we clone the flow record based on the transfered relationship for updated flow record + std::map<std::string, FlowFileRecord *>::iterator it; + for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + continue; + std::map<std::string, Relationship>::iterator itRelationship = + this->_transferRelationship.find(record->getUUIDStr()); + if (itRelationship != _transferRelationship.end()) + { + Relationship relationship = itRelationship->second; + // Find the relationship, we need to find the connections for that relationship + std::set<Connection *> connections = + _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); + if (connections.empty()) + { + // No connection + if (!_processContext->getProcessor()->isAutoTerminated(relationship)) + { + // Not autoterminate, we should have the connect + throw Exception(PROCESS_SESSION_EXCEPTION, "Connect empty for non auto terminated relationship"); + } + else + { + // Autoterminated + record->_markedDelete = true; + } + } + else + { + // We connections, clone the flow and assign the connection accordingly + for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) + { + Connection *connection(*itConnection); + if (itConnection == connections.begin()) + { + // First connection which the flow need be routed to + record->_connection = connection; + } + else + { + // Clone the flow file and route to the connection + FlowFileRecord *cloneFlow = clone(record); + cloneFlow->_connection = connection; + this->_clonedFlowFiles[cloneFlow->getUUIDStr()] = cloneFlow; + } + } + } + } + else + { + // Can not find relationship for the flow + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); + } + } + + // Do the samething for added flow file + for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + continue; + std::map<std::string, Relationship>::iterator itRelationship = + this->_transferRelationship.find(record->getUUIDStr()); + if (itRelationship != _transferRelationship.end()) + { + Relationship relationship = itRelationship->second; + // Find the relationship, we need to find the connections for that relationship + std::set<Connection *> connections = + _processContext->getProcessor()->getOutGoingConnections(relationship.getName()); + if (connections.empty()) + { + // No connection + if (!_processContext->getProcessor()->isAutoTerminated(relationship)) + { + // Not autoterminate, we should have the connect + throw Exception(PROCESS_SESSION_EXCEPTION, "Connect empty for non auto terminated relationship"); + } + else + { + // Autoterminated + record->_markedDelete = true; + } + } + else + { + // We connections, clone the flow and assign the connection accordingly + for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection) + { + Connection *connection(*itConnection); + if (itConnection == connections.begin()) + { + // First connection which the flow need be routed to + record->_connection = connection; + } + else + { + // Clone the flow file and route to the connection + FlowFileRecord *cloneFlow = clone(record); + cloneFlow->_connection = connection; + this->_clonedFlowFiles[cloneFlow->getUUIDStr()] = cloneFlow; + } + } + } + } + else + { + // Can not find relationship for the flow + throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow"); + } + } + + // Complete process the added and update flow files for the session, send the flow file to its queue + for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + { + delete record; + continue; + } + if (record->_connection) + record->_connection->put(record); + } + for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + { + delete record; + continue; + } + if (record->_connection) + record->_connection->put(record); + } + // Process the clone flow files + for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_markedDelete) + { + delete record; + continue; + } + if (record->_connection) + record->_connection->put(record); + } + // Delete the deleted flow files + for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + // Delete the snapshot + for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + _logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str()); + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process session commit"); + throw; + } +} + + +void ProcessSession::rollback() +{ + try + { + std::map<std::string, FlowFileRecord *>::iterator it; + // Requeue the snapshot of the flowfile back + for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + if (record->_orginalConnection) + record->_orginalConnection->put(record); + else + delete record; + } + // Process the clone flow files + for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) + { + FlowFileRecord *record = it->second; + delete record; + } + _logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str()); + } + catch (std::exception &exception) + { + _logger->log_debug("Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + _logger->log_debug("Caught Exception during process session roll back"); + throw; + } +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/Processor.cpp ---------------------------------------------------------------------- diff --git a/src/Processor.cpp b/src/Processor.cpp new file mode 100644 index 0000000..b21a1e3 --- /dev/null +++ b/src/Processor.cpp @@ -0,0 +1,363 @@ +/** + * @file Processor.cpp + * Processor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <time.h> +#include <chrono> +#include <thread> + +#include "Processor.h" +#include "ProcessContext.h" + +Processor::Processor(std::string name, uuid_t uuid) +: _name(name) +{ + if (!uuid) + // Generate the global UUID for the flow record + uuid_generate(_uuid); + else + uuid_copy(_uuid, uuid); + + // Setup the default values + _state = DISABLED; + _strategy = TIMER_DRIVEN; + _lossTolerant = false; + _triggerWhenEmpty = false; + _schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS; + _runDurantionNano = 0; + _yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000; + _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000; + _maxConcurrentTasks = 1; + _activeTasks = 0; + _yieldExpiration = 0; + _logger = Logger::getLogger(); + + _logger->log_info("Processor %s created", _name.c_str()); +} + +Processor::~Processor() +{ + +} + +bool Processor::isRunning() +{ + return (_state == RUNNING); +} + +bool Processor::setSupportedProperties(std::set<Property> properties) +{ + if (isRunning()) + { + _logger->log_info("Can not set processor property while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + _properties.clear(); + for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it) + { + Property item(*it); + _properties[item.getName()] = item; + } + + return true; +} + +bool Processor::setSupportedRelationships(std::set<Relationship> relationships) +{ + if (isRunning()) + { + _logger->log_info("Can not set processor supported relationship while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + _relationships.clear(); + for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) + { + Relationship item(*it); + _relationships[item.getName()] = item; + } + + return true; +} + +bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships) +{ + if (isRunning()) + { + _logger->log_info("Can not set processor auto terminated relationship while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + _autoTerminatedRelationships.clear(); + for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it) + { + Relationship item(*it); + _autoTerminatedRelationships[item.getName()] = item; + } + + return true; +} + +bool Processor::isAutoTerminated(Relationship relationship) +{ + bool isRun = isRunning(); + + if (!isRun) + _mtx.lock(); + + std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName()); + if (it != _autoTerminatedRelationships.end()) + { + if (!isRun) + _mtx.unlock(); + return true; + } + else + { + if (!isRun) + _mtx.unlock(); + return false; + } +} + +bool Processor::isSupportedRelationship(Relationship relationship) +{ + bool isRun = isRunning(); + + if (!isRun) + _mtx.lock(); + + std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName()); + if (it != _relationships.end()) + { + if (!isRun) + _mtx.unlock(); + return true; + } + else + { + if (!isRun) + _mtx.unlock(); + return false; + } +} + +bool Processor::getProperty(std::string name, std::string &value) +{ + bool isRun = isRunning(); + + if (!isRun) + // Because set property only allowed in non running state, we need to obtain lock avoid rack condition + _mtx.lock(); + + std::map<std::string, Property>::iterator it = _properties.find(name); + if (it != _properties.end()) + { + Property item = it->second; + value = item.getValue(); + if (!isRun) + _mtx.unlock(); + return true; + } + else + { + if (!isRun) + _mtx.unlock(); + return false; + } +} + +bool Processor::setProperty(std::string name, std::string value) +{ + if (isRunning()) + { + _logger->log_info("Can not set processor property while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + std::map<std::string, Property>::iterator it = _properties.find(name); + if (it != _properties.end()) + { + Property item = it->second; + _properties[item.getName()] = item; + return true; + } + else + { + return false; + } +} + +void Processor::yield() +{ + _yieldExpiration = (getTimeMillis() + _yieldPeriodMsec); +} + +std::set<Connection *> Processor::getOutGoingConnections(std::string relationship) +{ + std::set<Connection *> empty; + + std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship); + if (it != _outGoingConnections.end()) + { + return _outGoingConnections[relationship]; + } + else + { + return empty; + } +} + +bool Processor::addConnection(Connection *connection) +{ + bool ret = false; + + if (isRunning()) + { + _logger->log_info("Can not add connection while the process %s is running", + _name.c_str()); + return false; + } + + std::lock_guard<std::mutex> lock(_mtx); + + uuid_t srcUUID; + uuid_t destUUID; + + connection->getSourceProcessorUUID(srcUUID); + connection->getDestinationProcessorUUID(destUUID); + + if (uuid_compare(_uuid, destUUID) == 0) + { + // Connection is destination to the current processor + if (_incomingConnections.find(connection) == _incomingConnections.end()) + { + _incomingConnections.insert(connection); + connection->setDestinationProcessor(this); + _logger->log_info("Add connection %s into Processor %s incoming connection", + connection->getName().c_str(), _name.c_str()); + ret = true; + } + } + + if (uuid_compare(_uuid, srcUUID) == 0) + { + std::string relationship = connection->getRelationship().getName(); + // Connection is source from the current processor + std::map<std::string, std::set<Connection *>>::iterator it = + _outGoingConnections.find(relationship); + if (it != _outGoingConnections.end()) + { + // We already has connection for this relationship + std::set<Connection *> existedConnection = it->second; + if (existedConnection.find(connection) == existedConnection.end()) + { + // We do not have the same connection for this relationship yet + existedConnection.insert(connection); + connection->setSourceProcessor(this); + _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), _name.c_str(), relationship.c_str()); + ret = true; + } + } + else + { + // We do not have any outgoing connection for this relationship yet + std::set<Connection *> newConnection; + newConnection.insert(connection); + connection->setSourceProcessor(this); + _outGoingConnections[relationship] = newConnection; + _logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), _name.c_str(), relationship.c_str()); + ret = true; + } + } + + return ret; +} + +void Processor::removeConnection(Connection *connection) +{ + if (isRunning()) + { + _logger->log_info("Can not remove connection while the process %s is running", + _name.c_str()); + return; + } + + std::lock_guard<std::mutex> lock(_mtx); + + uuid_t srcUUID; + uuid_t destUUID; + + connection->getSourceProcessorUUID(srcUUID); + connection->getDestinationProcessorUUID(destUUID); + + if (uuid_compare(_uuid, destUUID) == 0) + { + // Connection is destination to the current processor + if (_incomingConnections.find(connection) != _incomingConnections.end()) + { + _incomingConnections.erase(connection); + connection->setDestinationProcessor(NULL); + _logger->log_info("Remove connection %s into Processor %s incoming connection", + connection->getName().c_str(), _name.c_str()); + } + } + + if (uuid_compare(_uuid, srcUUID) == 0) + { + std::string relationship = connection->getRelationship().getName(); + // Connection is source from the current processor + std::map<std::string, std::set<Connection *>>::iterator it = + _outGoingConnections.find(relationship); + if (it == _outGoingConnections.end()) + { + return; + } + else + { + if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end()) + { + _outGoingConnections[relationship].erase(connection); + connection->setSourceProcessor(NULL); + _logger->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s", + connection->getName().c_str(), _name.c_str(), relationship.c_str()); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/src/ResourceClaim.cpp ---------------------------------------------------------------------- diff --git a/src/ResourceClaim.cpp b/src/ResourceClaim.cpp new file mode 100644 index 0000000..5e392e9 --- /dev/null +++ b/src/ResourceClaim.cpp @@ -0,0 +1,41 @@ +/** + * @file ResourceClaim.cpp + * ResourceClaim class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> + +#include "ResourceClaim.h" + +std::atomic<uint64_t> ResourceClaim::_localResourceClaimNumber(0); + +ResourceClaim::ResourceClaim(const std::string contentDirectory) +: _id(_localResourceClaimNumber.load()), + _flowFileRecordOwnedCount(0) +{ + char uuidStr[37]; + + // Generate the global UUID for the resource claim + uuid_generate(_uuid); + // Increase the local ID for the resource claim + ++_localResourceClaimNumber; + uuid_parse(uuidStr, _uuid); + // Create the full content path for the content + _contentFullPath = contentDirectory + "/" + uuidStr; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/62394f72/test/FlowFileRecordTest.cpp ---------------------------------------------------------------------- diff --git a/test/FlowFileRecordTest.cpp b/test/FlowFileRecordTest.cpp new file mode 100644 index 0000000..09a3d33 --- /dev/null +++ b/test/FlowFileRecordTest.cpp @@ -0,0 +1,28 @@ +/** + * @file MiNiFiMain.cpp + * MiNiFiMain implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> + +#include "FlowFileRecord.h" + +int main(int argc, char **argv) +{ +}