Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 955f7ab51 -> b3848a319
MINIFI-69: Add GetFile and TailFile processor This closes #5. Signed-off-by: Aldrin Piri <ald...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b3848a31 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b3848a31 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b3848a31 Branch: refs/heads/master Commit: b3848a3192f349023ef209b943519f7812e3faca Parents: 955f7ab Author: Bin Qiu <benqiu2...@gmail.com> Authored: Tue Aug 2 17:16:14 2016 -0400 Committer: Aldrin Piri <ald...@apache.org> Committed: Tue Aug 2 17:16:14 2016 -0400 ---------------------------------------------------------------------- conf/flowGetFile.xml | 127 ++++++++++++++++++++ conf/flowTailFile.xml | 101 ++++++++++++++++ inc/FlowController.h | 2 + inc/GetFile.h | 114 ++++++++++++++++++ inc/ProcessSession.h | 2 +- inc/TailFile.h | 93 +++++++++++++++ src/FlowController.cpp | 8 ++ src/GetFile.cpp | 277 ++++++++++++++++++++++++++++++++++++++++++++ src/ProcessSession.cpp | 16 ++- src/TailFile.cpp | 272 +++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 1010 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/conf/flowGetFile.xml ---------------------------------------------------------------------- diff --git a/conf/flowGetFile.xml b/conf/flowGetFile.xml new file mode 100644 index 0000000..25369fb --- /dev/null +++ b/conf/flowGetFile.xml @@ -0,0 +1,127 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>9347f92c-3dcc-4ece-ba97-d67eed846a39</id> + <name>GetFile</name> + <position x="2495.369384765625" y="749.25244140625"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.GetFile</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>5 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>STOPPED</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Input Directory</name> + <value>/Users/binqiu/GetFile</value> + </property> + <property> + <name>File Filter</name> + <value>[^\.].*</value> + </property> + <property> + <name>Path Filter</name> + </property> + <property> + <name>Batch Size</name> + <value>10</value> + </property> + <property> + <name>Keep Source File</name> + <value>true</value> + </property> + <property> + <name>Recurse Subdirectories</name> + <value>true</value> + </property> + <property> + <name>Polling Interval</name> + <value>0 sec</value> + </property> + <property> + <name>Ignore Hidden Files</name> + <value>true</value> + </property> + <property> + <name>Minimum File Age</name> + <value>0 sec</value> + </property> + <property> + <name>Maximum File Age</name> + </property> + <property> + <name>Minimum File Size</name> + <value>0 B</value> + </property> + <property> + <name>Maximum File Size</name> + </property> + </processor> + <processor> + <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id> + <name>LogAttribute</name> + <position x="3239.369384765625" y="822.25244140625"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>STOPPED</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Log Level</name> + <value>info</value> + </property> + <property> + <name>Log Payload</name> + <value>false</value> + </property> + <property> + <name>Attributes to Log</name> + </property> + <property> + <name>Attributes to Ignore</name> + </property> + <property> + <name>Log prefix</name> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <connection> + <id>d02db6bf-8c6f-463b-b07e-47f7ab726502</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>9347f92c-3dcc-4ece-ba97-d67eed846a39</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/conf/flowTailFile.xml ---------------------------------------------------------------------- diff --git a/conf/flowTailFile.xml b/conf/flowTailFile.xml new file mode 100644 index 0000000..022bc4e --- /dev/null +++ b/conf/flowTailFile.xml @@ -0,0 +1,101 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<flowController> + <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount> + <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount> + <rootGroup> + <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id> + <name>NiFi Flow</name> + <position x="0.0" y="0.0"/> + <comment/> + <processor> + <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id> + <name>LogAttribute</name> + <position x="3239.369384765625" y="822.25244140625"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.LogAttribute</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>0 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>Log Level</name> + <value>info</value> + </property> + <property> + <name>Log Payload</name> + <value>false</value> + </property> + <property> + <name>Attributes to Log</name> + </property> + <property> + <name>Attributes to Ignore</name> + </property> + <property> + <name>Log prefix</name> + </property> + <autoTerminatedRelationship>success</autoTerminatedRelationship> + </processor> + <processor> + <id>c6ced523-be07-48d3-90c2-82b87d208f2e</id> + <name>TailFile</name> + <position x="2629.369384765625" y="761.25244140625"/> + <styles/> + <comment/> + <class>org.apache.nifi.processors.standard.TailFile</class> + <maxConcurrentTasks>1</maxConcurrentTasks> + <schedulingPeriod>1 sec</schedulingPeriod> + <penalizationPeriod>30 sec</penalizationPeriod> + <yieldPeriod>1 sec</yieldPeriod> + <bulletinLevel>WARN</bulletinLevel> + <lossTolerant>false</lossTolerant> + <scheduledState>RUNNING</scheduledState> + <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> + <runDurationNanos>0</runDurationNanos> + <property> + <name>File to Tail</name> + <value>/Users/binqiu/getFile/log.txt</value> + </property> + <property> + <name>Rolling Filename Pattern</name> + </property> + <property> + <name>State File</name> + <value>log.state</value> + </property> + <property> + <name>Initial Start Position</name> + <value>Beginning of File</value> + </property> + <property> + <name>File Location</name> + <value>Local</value> + </property> + </processor> + <connection> + <id>12899241-005e-4d98-b3fa-7e00b1840bac</id> + <name/> + <bendPoints/> + <labelIndex>1</labelIndex> + <zIndex>0</zIndex> + <sourceId>c6ced523-be07-48d3-90c2-82b87d208f2e</sourceId> + <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId> + <sourceType>PROCESSOR</sourceType> + <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId> + <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId> + <destinationType>PROCESSOR</destinationType> + <relationship>success</relationship> + <maxWorkQueueSize>0</maxWorkQueueSize> + <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize> + <flowFileExpiration>0 sec</flowFileExpiration> + </connection> + </rootGroup> + <controllerServices/> + <reportingTasks/> +</flowController> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/FlowController.h ---------------------------------------------------------------------- diff --git a/inc/FlowController.h b/inc/FlowController.h index 3895096..3dd0952 100644 --- a/inc/FlowController.h +++ b/inc/FlowController.h @@ -46,6 +46,8 @@ #include "TimerDrivenSchedulingAgent.h" #include "FlowControlProtocol.h" #include "RemoteProcessorGroupPort.h" +#include "GetFile.h" +#include "TailFile.h" //! Default NiFi Root Group Name #define DEFAULT_ROOT_GROUP_NAME "" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/GetFile.h ---------------------------------------------------------------------- diff --git a/inc/GetFile.h b/inc/GetFile.h new file mode 100644 index 0000000..333813d --- /dev/null +++ b/inc/GetFile.h @@ -0,0 +1,114 @@ +/** + * @file GetFile.h + * GetFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __GET_FILE_H__ +#define __GET_FILE_H__ + +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! GetFile Class +class GetFile : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + GetFile(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + _directory = "."; + _recursive = true; + _keepSourceFile = false; + _minAge = 0; + _maxAge = 0; + _minSize = 0; + _maxSize = 0;; + _ignoreHiddenFile = true; + _pollInterval = 0; + _batchSize = 10; + _lastDirectoryListingTime = getTimeMillis(); + } + //! Destructor + virtual ~GetFile() + { + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property Directory; + static Property Recurse; + static Property KeepSourceFile; + static Property MinAge; + static Property MaxAge; + static Property MinSize; + static Property MaxSize; + static Property IgnoreHiddenFile; + static Property PollInterval; + static Property BatchSize; + //! Supported Relationships + static Relationship Success; + +public: + //! OnTrigger method, implemented by NiFi GetFile + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi GetFile + virtual void initialize(void); + //! perform directory listing + void performListing(std::string dir); + +protected: + +private: + //! Logger + Logger *_logger; + //! Queue for store directory list + std::queue<std::string> _dirList; + //! Get Listing size + uint64_t getListingSize() { + std::lock_guard<std::mutex> lock(_mtx); + return _dirList.size(); + } + //! Whether the directory listing is empty + bool isListingEmpty(); + //! Put full path file name into directory listing + void putListing(std::string fileName); + //! Poll directory listing for files + void pollListing(std::queue<std::string> &list, int maxSize); + //! Check whether file can be added to the directory listing + bool acceptFile(std::string fileName); + //! Mutex for protection of the directory listing + std::mutex _mtx; + std::string _directory; + bool _recursive; + bool _keepSourceFile; + int64_t _minAge; + int64_t _maxAge; + int64_t _minSize; + int64_t _maxSize; + bool _ignoreHiddenFile; + int64_t _pollInterval; + int64_t _batchSize; + uint64_t _lastDirectoryListingTime; +}; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/ProcessSession.h ---------------------------------------------------------------------- diff --git a/inc/ProcessSession.h b/inc/ProcessSession.h index d38e71b..c8ec3a5 100644 --- a/inc/ProcessSession.h +++ b/inc/ProcessSession.h @@ -83,7 +83,7 @@ public: //! Penalize the flow void penalize(FlowFileRecord *flow); //! Import the existed file into the flow - void import(std::string source, FlowFileRecord *flow); + void import(std::string source, FlowFileRecord *flow, bool keepSource = true, uint64_t offset = 0); protected: //! FlowFiles being modified by current process session http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/TailFile.h ---------------------------------------------------------------------- diff --git a/inc/TailFile.h b/inc/TailFile.h new file mode 100644 index 0000000..5c4ba09 --- /dev/null +++ b/inc/TailFile.h @@ -0,0 +1,93 @@ +/** + * @file TailFile.h + * TailFile class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __TAIL_FILE_H__ +#define __TAIL_FILE_H__ + +#include "FlowFileRecord.h" +#include "Processor.h" +#include "ProcessSession.h" + +//! TailFile Class +class TailFile : public Processor +{ +public: + //! Constructor + /*! + * Create a new processor + */ + TailFile(std::string name, uuid_t uuid = NULL) + : Processor(name, uuid) + { + _logger = Logger::getLogger(); + _stateRecovered = false; + } + //! Destructor + virtual ~TailFile() + { + storeState(); + } + //! Processor Name + static const std::string ProcessorName; + //! Supported Properties + static Property FileName; + static Property StateFile; + //! Supported Relationships + static Relationship Success; + +public: + //! OnTrigger method, implemented by NiFi TailFile + virtual void onTrigger(ProcessContext *context, ProcessSession *session); + //! Initialize, over write by NiFi TailFile + virtual void initialize(void); + //! recoverState + void recoverState(); + //! storeState + void storeState(); + +protected: + +private: + //! Logger + Logger *_logger; + std::string _fileLocation; + //! Property Specified Tailed File Name + std::string _fileName; + //! File to save state + std::string _stateFile; + //! State related to the tailed file + std::string _currentTailFileName; + uint64_t _currentTailFilePosition; + bool _stateRecovered; + uint64_t _currentTailFileCreatedTime; + //! Utils functions for parse state file + std::string trimLeft(const std::string& s); + std::string trimRight(const std::string& s); + void parseStateFileLine(char *buf); + void checkRollOver(); + +}; + +//! Matched File Item for Roll over check +typedef struct { + std::string fileName; + uint64_t modifiedTime; +} TailMatchedFileItem; + +#endif http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/src/FlowController.cpp b/src/FlowController.cpp index b18953e..7f4061e 100644 --- a/src/FlowController.cpp +++ b/src/FlowController.cpp @@ -140,6 +140,14 @@ Processor *FlowController::createProcessor(std::string name, uuid_t uuid) { processor = new RealTimeDataCollector(name, uuid); } + else if (name == GetFile::ProcessorName) + { + processor = new GetFile(name, uuid); + } + else if (name == TailFile::ProcessorName) + { + processor = new TailFile(name, uuid); + } else { _logger->log_error("No Processor defined for %s", name.c_str()); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/GetFile.cpp ---------------------------------------------------------------------- diff --git a/src/GetFile.cpp b/src/GetFile.cpp new file mode 100644 index 0000000..32503f5 --- /dev/null +++ b/src/GetFile.cpp @@ -0,0 +1,277 @@ +/** + * @file GetFile.cpp + * GetFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <time.h> +#include <sstream> +#include <stdio.h> +#include <string> +#include <iostream> +#include <dirent.h> +#include <limits.h> +#include <unistd.h> + +#include "TimeUtil.h" +#include "GetFile.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string GetFile::ProcessorName("GetFile"); +Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10"); +Property GetFile::Directory("Input Directory", "The input directory from which to pull files", "."); +Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true"); +Property GetFile::KeepSourceFile("Keep Source File", + "If true, the file is not deleted after it has been copied to the Content Repository", "false"); +Property GetFile::MaxAge("Maximum File Age", + "The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", "0 sec"); +Property GetFile::MinAge("Minimum File Age", + "The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", "0 sec"); +Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B"); +Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B"); +Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec"); +Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true"); +Relationship GetFile::Success("success", "All files are routed to success"); + +void GetFile::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(BatchSize); + properties.insert(Directory); + properties.insert(IgnoreHiddenFile); + properties.insert(KeepSourceFile); + properties.insert(MaxAge); + properties.insert(MinAge); + properties.insert(MaxSize); + properties.insert(MinSize); + properties.insert(PollInterval); + properties.insert(Recurse); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void GetFile::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + if (context->getProperty(Directory.getName(), value)) + { + _directory = value; + } + if (context->getProperty(BatchSize.getName(), value)) + { + Property::StringToInt(value, _batchSize); + } + if (context->getProperty(IgnoreHiddenFile.getName(), value)) + { + Property::StringToBool(value, _ignoreHiddenFile); + } + if (context->getProperty(KeepSourceFile.getName(), value)) + { + Property::StringToBool(value, _keepSourceFile); + } + if (context->getProperty(MaxAge.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _maxAge, unit) && + Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge)) + { + + } + } + if (context->getProperty(MinAge.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _minAge, unit) && + Property::ConvertTimeUnitToMS(_minAge, unit, _minAge)) + { + + } + } + if (context->getProperty(MaxSize.getName(), value)) + { + Property::StringToInt(value, _maxSize); + } + if (context->getProperty(MinSize.getName(), value)) + { + Property::StringToInt(value, _minSize); + } + if (context->getProperty(PollInterval.getName(), value)) + { + TimeUnit unit; + if (Property::StringToTime(value, _pollInterval, unit) && + Property::ConvertTimeUnitToMS(_pollInterval, unit, _pollInterval)) + { + + } + } + if (context->getProperty(Recurse.getName(), value)) + { + Property::StringToBool(value, _recursive); + } + + // Perform directory list + if (isListingEmpty()) + { + if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval) + { + performListing(_directory); + } + } + + if (!isListingEmpty()) + { + try + { + std::queue<std::string> list; + pollListing(list, _batchSize); + while (!list.empty()) + { + std::string fileName = list.front(); + list.pop(); + _logger->log_info("GetFile process %s", fileName.c_str()); + FlowFileRecord *flowFile = session->create(); + if (!flowFile) + return; + std::size_t found = fileName.find_last_of("/\\"); + std::string path = fileName.substr(0,found); + std::string name = fileName.substr(found+1); + flowFile->updateAttribute(FILENAME, name); + flowFile->updateAttribute(PATH, path); + flowFile->addAttribute(ABSOLUTE_PATH, fileName); + session->import(fileName, flowFile, _keepSourceFile); + session->transfer(flowFile, Success); + } + } + catch (std::exception &exception) + { + _logger->log_debug("GetFile Caught Exception %s", exception.what()); + throw; + } + catch (...) + { + throw; + } + } +} + +bool GetFile::isListingEmpty() +{ + std::lock_guard<std::mutex> lock(_mtx); + + return _dirList.empty(); +} + +void GetFile::putListing(std::string fileName) +{ + std::lock_guard<std::mutex> lock(_mtx); + + _dirList.push(fileName); +} + +void GetFile::pollListing(std::queue<std::string> &list, int maxSize) +{ + std::lock_guard<std::mutex> lock(_mtx); + + while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize)) + { + std::string fileName = _dirList.front(); + _dirList.pop(); + list.push(fileName); + } + + return; +} + +bool GetFile::acceptFile(std::string fileName) +{ + struct stat statbuf; + + if (stat(fileName.c_str(), &statbuf) == 0) + { + if (_minSize > 0 && statbuf.st_size <_minSize) + return false; + + if (_maxSize > 0 && statbuf.st_size > _maxSize) + return false; + + uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); + uint64_t fileAge = getTimeMillis() - modifiedTime; + if (_minAge > 0 && fileAge < _minAge) + return false; + if (_maxAge > 0 && fileAge > _maxAge) + return false; + + if (_ignoreHiddenFile && fileName.c_str()[0] == '.') + return false; + + if (access(fileName.c_str(), R_OK) != 0) + return false; + + if (_keepSourceFile == false && access(fileName.c_str(), W_OK) != 0) + return false; + + return true; + } + + return false; +} + +void GetFile::performListing(std::string dir) +{ + DIR *d; + d = opendir(dir.c_str()); + if (!d) + return; + while (1) + { + struct dirent *entry; + entry = readdir(d); + if (!entry) + break; + std::string d_name = entry->d_name; + if ((entry->d_type & DT_DIR)) + { + // if this is a directory + if (_recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0) + { + std::string path = dir + "/" + d_name; + performListing(path); + } + } + else + { + std::string fileName = dir + "/" + d_name; + if (acceptFile(fileName)) + { + // check whether we can take this file + putListing(fileName); + } + } + } + closedir(d); +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp index 2628ae3..141d1ee 100644 --- a/src/ProcessSession.cpp +++ b/src/ProcessSession.cpp @@ -349,7 +349,7 @@ void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback) } } -void ProcessSession::import(std::string source, FlowFileRecord *flow) +void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset) { ResourceClaim *claim = NULL; @@ -368,6 +368,7 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow) if (fs.is_open() && input.is_open()) { // Open the source file and stream to the flow file + input.seekg(offset, fs.beg); while (input.good()) { input.read(buf, size); @@ -394,6 +395,8 @@ void ProcessSession::import(std::string source, FlowFileRecord *flow) flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); input.close(); + if (!keepSource) + std::remove(source.c_str()); } else { @@ -610,6 +613,12 @@ void ProcessSession::commit() FlowFileRecord *record = it->second; delete record; } + // All done + _updatedFlowFiles.clear(); + _addedFlowFiles.clear(); + _clonedFlowFiles.clear(); + _deletedFlowFiles.clear(); + _originalFlowFiles.clear(); _logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str()); } catch (std::exception &exception) @@ -642,22 +651,27 @@ void ProcessSession::rollback() else delete record; } + _originalFlowFiles.clear(); // Process the clone flow files for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++) { FlowFileRecord *record = it->second; delete record; } + _clonedFlowFiles.clear(); for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++) { FlowFileRecord *record = it->second; delete record; } + _addedFlowFiles.clear(); for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++) { FlowFileRecord *record = it->second; delete record; } + _updatedFlowFiles.clear(); + _deletedFlowFiles.clear(); _logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str()); } catch (std::exception &exception) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/TailFile.cpp ---------------------------------------------------------------------- diff --git a/src/TailFile.cpp b/src/TailFile.cpp new file mode 100644 index 0000000..445255b --- /dev/null +++ b/src/TailFile.cpp @@ -0,0 +1,272 @@ +/** + * @file TailFile.cpp + * TailFile class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <vector> +#include <queue> +#include <map> +#include <set> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <time.h> +#include <sstream> +#include <stdio.h> +#include <string> +#include <iostream> +#include <dirent.h> +#include <limits.h> +#include <unistd.h> + +#include "TimeUtil.h" +#include "TailFile.h" +#include "ProcessContext.h" +#include "ProcessSession.h" + +const std::string TailFile::ProcessorName("TailFile"); +Property TailFile::FileName("File to Tail", "Fully-qualified filename of the file that should be tailed", ""); +Property TailFile::StateFile("State File", + "Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off", ""); +Relationship TailFile::Success("success", "All files are routed to success"); + +void TailFile::initialize() +{ + //! Set the supported properties + std::set<Property> properties; + properties.insert(FileName); + properties.insert(StateFile); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +std::string TailFile::trimLeft(const std::string& s) +{ + const char *WHITESPACE = " \n\r\t"; + size_t startpos = s.find_first_not_of(WHITESPACE); + return (startpos == std::string::npos) ? "" : s.substr(startpos); +} + +std::string TailFile::trimRight(const std::string& s) +{ + const char *WHITESPACE = " \n\r\t"; + size_t endpos = s.find_last_not_of(WHITESPACE); + return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1); +} + +void TailFile::parseStateFileLine(char *buf) +{ + char *line = buf; + + while ((line[0] == ' ') || (line[0] =='\t')) + ++line; + + char first = line[0]; + if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) + { + return; + } + + char *equal = strchr(line, '='); + if (equal == NULL) + { + return; + } + + equal[0] = '\0'; + std::string key = line; + + equal++; + while ((equal[0] == ' ') || (equal[0] == '\t')) + ++equal; + + first = equal[0]; + if ((first == '\0') || (first == '\r') || (first== '\n')) + { + return; + } + + std::string value = equal; + key = trimRight(key); + value = trimRight(value); + + if (key == "FILENAME") + this->_currentTailFileName = value; + if (key == "POSITION") + this->_currentTailFilePosition = std::stoi(value); + + return; +} + +void TailFile::recoverState() +{ + std::ifstream file(_stateFile.c_str(), std::ifstream::in); + if (!file.good()) + { + _logger->log_error("load state file failed %s", _stateFile.c_str()); + return; + } + const unsigned int bufSize = 512; + char buf[bufSize]; + for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize)) + { + parseStateFileLine(buf); + } +} + +void TailFile::storeState() +{ + std::ofstream file(_stateFile.c_str()); + if (!file.is_open()) + { + _logger->log_error("store state file failed %s", _stateFile.c_str()); + return; + } + file << "FILENAME=" << this->_currentTailFileName << "\n"; + file << "POSITION=" << this->_currentTailFilePosition << "\n"; + file.close(); +} + +static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) +{ + return (i.modifiedTime < j.modifiedTime); +} +void TailFile::checkRollOver() +{ + struct stat statbuf; + std::vector<TailMatchedFileItem> matchedFiles; + std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + + if (stat(fullPath.c_str(), &statbuf) == 0) + { + if (statbuf.st_size > this->_currentTailFilePosition) + // there are new input for the current tail file + return; + + uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 1000); + std::string pattern = _fileName; + std::size_t found = _fileName.find_last_of("."); + if (found != std::string::npos) + pattern = _fileName.substr(0,found); + DIR *d; + d = opendir(this->_fileLocation.c_str()); + if (!d) + return; + while (1) + { + struct dirent *entry; + entry = readdir(d); + if (!entry) + break; + std::string d_name = entry->d_name; + if (!(entry->d_type & DT_DIR)) + { + std::string fileName = d_name; + std::string fileFullName = this->_fileLocation + "/" + d_name; + if (fileFullName.find(pattern) != std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0) + { + if (((uint64_t) (statbuf.st_mtime) * 1000) >= modifiedTimeCurrentTailFile) + { + TailMatchedFileItem item; + item.fileName = fileName; + item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); + matchedFiles.push_back(item); + } + } + } + } + closedir(d); + + // Sort the list based on modified time + std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); + for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); it!=matchedFiles.end(); ++it) + { + TailMatchedFileItem item = *it; + if (item.fileName == _currentTailFileName) + { + ++it; + if (it!=matchedFiles.end()) + { + TailMatchedFileItem nextItem = *it; + _logger->log_info("TailFile File Roll Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str()); + _currentTailFileName = nextItem.fileName; + _currentTailFilePosition = 0; + storeState(); + } + break; + } + } + } + else + return; +} + + +void TailFile::onTrigger(ProcessContext *context, ProcessSession *session) +{ + std::string value; + if (context->getProperty(FileName.getName(), value)) + { + std::size_t found = value.find_last_of("/\\"); + this->_fileLocation = value.substr(0,found); + this->_fileName = value.substr(found+1); + } + if (context->getProperty(StateFile.getName(), value)) + { + _stateFile = value; + } + if (!this->_stateRecovered) + { + _stateRecovered = true; + this->_currentTailFileName = _fileName; + this->_currentTailFilePosition = 0; + // recover the state if we have not done so + this->recoverState(); + } + checkRollOver(); + std::string fullPath = this->_fileLocation + "/" + _currentTailFileName; + struct stat statbuf; + if (stat(fullPath.c_str(), &statbuf) == 0) + { + if (statbuf.st_size <= this->_currentTailFilePosition) + // there are no new input for the current tail file + { + context->yield(); + return; + } + FlowFileRecord *flowFile = session->create(); + if (!flowFile) + return; + std::size_t found = _currentTailFileName.find_last_of("."); + std::string baseName = _currentTailFileName.substr(0,found); + std::string extension = _currentTailFileName.substr(found+1); + flowFile->updateAttribute(PATH, _fileLocation); + flowFile->addAttribute(ABSOLUTE_PATH, fullPath); + session->import(fullPath, flowFile, true, this->_currentTailFilePosition); + session->transfer(flowFile, Success); + _logger->log_info("TailFile %s for %d bytes", _currentTailFileName.c_str(), flowFile->getSize()); + std::string logName = baseName + "." + std::to_string(_currentTailFilePosition) + "-" + + std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + extension; + flowFile->updateAttribute(FILENAME, logName); + this->_currentTailFilePosition += flowFile->getSize(); + storeState(); + } +} +