Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 955f7ab51 -> b3848a319


MINIFI-69: Add GetFile and TailFile processor
This closes #5.

Signed-off-by: Aldrin Piri <ald...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/b3848a31
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/b3848a31
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/b3848a31

Branch: refs/heads/master
Commit: b3848a3192f349023ef209b943519f7812e3faca
Parents: 955f7ab
Author: Bin Qiu <benqiu2...@gmail.com>
Authored: Tue Aug 2 17:16:14 2016 -0400
Committer: Aldrin Piri <ald...@apache.org>
Committed: Tue Aug 2 17:16:14 2016 -0400

----------------------------------------------------------------------
 conf/flowGetFile.xml   | 127 ++++++++++++++++++++
 conf/flowTailFile.xml  | 101 ++++++++++++++++
 inc/FlowController.h   |   2 +
 inc/GetFile.h          | 114 ++++++++++++++++++
 inc/ProcessSession.h   |   2 +-
 inc/TailFile.h         |  93 +++++++++++++++
 src/FlowController.cpp |   8 ++
 src/GetFile.cpp        | 277 ++++++++++++++++++++++++++++++++++++++++++++
 src/ProcessSession.cpp |  16 ++-
 src/TailFile.cpp       | 272 +++++++++++++++++++++++++++++++++++++++++++
 10 files changed, 1010 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/conf/flowGetFile.xml
----------------------------------------------------------------------
diff --git a/conf/flowGetFile.xml b/conf/flowGetFile.xml
new file mode 100644
index 0000000..25369fb
--- /dev/null
+++ b/conf/flowGetFile.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>9347f92c-3dcc-4ece-ba97-d67eed846a39</id>
+      <name>GetFile</name>
+      <position x="2495.369384765625" y="749.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.GetFile</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>5 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>STOPPED</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Input Directory</name>
+        <value>/Users/binqiu/GetFile</value>
+      </property>
+      <property>
+        <name>File Filter</name>
+        <value>[^\.].*</value>
+      </property>
+      <property>
+        <name>Path Filter</name>
+      </property>
+      <property>
+        <name>Batch Size</name>
+        <value>10</value>
+      </property>
+      <property>
+        <name>Keep Source File</name>
+        <value>true</value>
+      </property>
+      <property>
+        <name>Recurse Subdirectories</name>
+        <value>true</value>
+      </property>
+      <property>
+        <name>Polling Interval</name>
+        <value>0 sec</value>
+      </property>
+      <property>
+        <name>Ignore Hidden Files</name>
+        <value>true</value>
+      </property>
+      <property>
+        <name>Minimum File Age</name>
+        <value>0 sec</value>
+      </property>
+      <property>
+        <name>Maximum File Age</name>
+      </property>
+      <property>
+        <name>Minimum File Size</name>
+        <value>0 B</value>
+      </property>
+      <property>
+        <name>Maximum File Size</name>
+      </property>
+    </processor>
+    <processor>
+      <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id>
+      <name>LogAttribute</name>
+      <position x="3239.369384765625" y="822.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>STOPPED</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Log Level</name>
+        <value>info</value>
+      </property>
+      <property>
+        <name>Log Payload</name>
+        <value>false</value>
+      </property>
+      <property>
+        <name>Attributes to Log</name>
+      </property>
+      <property>
+        <name>Attributes to Ignore</name>
+      </property>
+      <property>
+        <name>Log prefix</name>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <connection>
+      <id>d02db6bf-8c6f-463b-b07e-47f7ab726502</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>9347f92c-3dcc-4ece-ba97-d67eed846a39</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/conf/flowTailFile.xml
----------------------------------------------------------------------
diff --git a/conf/flowTailFile.xml b/conf/flowTailFile.xml
new file mode 100644
index 0000000..022bc4e
--- /dev/null
+++ b/conf/flowTailFile.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id>
+      <name>LogAttribute</name>
+      <position x="3239.369384765625" y="822.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Log Level</name>
+        <value>info</value>
+      </property>
+      <property>
+        <name>Log Payload</name>
+        <value>false</value>
+      </property>
+      <property>
+        <name>Attributes to Log</name>
+      </property>
+      <property>
+        <name>Attributes to Ignore</name>
+      </property>
+      <property>
+        <name>Log prefix</name>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <processor>
+      <id>c6ced523-be07-48d3-90c2-82b87d208f2e</id>
+      <name>TailFile</name>
+      <position x="2629.369384765625" y="761.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.TailFile</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>1 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>File to Tail</name>
+        <value>/Users/binqiu/getFile/log.txt</value>
+      </property>
+      <property>
+        <name>Rolling Filename Pattern</name>
+      </property>
+      <property>
+        <name>State File</name>
+        <value>log.state</value>
+      </property>
+      <property>
+        <name>Initial Start Position</name>
+        <value>Beginning of File</value>
+      </property>
+      <property>
+        <name>File Location</name>
+        <value>Local</value>
+      </property>
+    </processor>
+    <connection>
+      <id>12899241-005e-4d98-b3fa-7e00b1840bac</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>c6ced523-be07-48d3-90c2-82b87d208f2e</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId>
+      
<destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/FlowController.h
----------------------------------------------------------------------
diff --git a/inc/FlowController.h b/inc/FlowController.h
index 3895096..3dd0952 100644
--- a/inc/FlowController.h
+++ b/inc/FlowController.h
@@ -46,6 +46,8 @@
 #include "TimerDrivenSchedulingAgent.h"
 #include "FlowControlProtocol.h"
 #include "RemoteProcessorGroupPort.h"
+#include "GetFile.h"
+#include "TailFile.h"
 
 //! Default NiFi Root Group Name
 #define DEFAULT_ROOT_GROUP_NAME ""

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/GetFile.h
----------------------------------------------------------------------
diff --git a/inc/GetFile.h b/inc/GetFile.h
new file mode 100644
index 0000000..333813d
--- /dev/null
+++ b/inc/GetFile.h
@@ -0,0 +1,114 @@
+/**
+ * @file GetFile.h
+ * GetFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __GET_FILE_H__
+#define __GET_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! GetFile Class
+class GetFile : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       GetFile(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+               _directory = ".";
+               _recursive = true;
+               _keepSourceFile = false;
+               _minAge = 0;
+               _maxAge = 0;
+               _minSize = 0;
+               _maxSize = 0;;
+               _ignoreHiddenFile = true;
+               _pollInterval = 0;
+               _batchSize = 10;
+               _lastDirectoryListingTime = getTimeMillis();
+       }
+       //! Destructor
+       virtual ~GetFile()
+       {
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property Directory;
+       static Property Recurse;
+       static Property KeepSourceFile;
+       static Property MinAge;
+       static Property MaxAge;
+       static Property MinSize;
+       static Property MaxSize;
+       static Property IgnoreHiddenFile;
+       static Property PollInterval;
+       static Property BatchSize;
+       //! Supported Relationships
+       static Relationship Success;
+
+public:
+       //! OnTrigger method, implemented by NiFi GetFile
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi GetFile
+       virtual void initialize(void);
+       //! perform directory listing
+       void performListing(std::string dir);
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+       //! Queue for store directory list
+       std::queue<std::string> _dirList;
+       //! Get Listing size
+       uint64_t getListingSize() {
+               std::lock_guard<std::mutex> lock(_mtx);
+               return _dirList.size();
+       }
+       //! Whether the directory listing is empty
+       bool isListingEmpty();
+       //! Put full path file name into directory listing
+       void putListing(std::string fileName);
+       //! Poll directory listing for files
+       void pollListing(std::queue<std::string> &list, int maxSize);
+       //! Check whether file can be added to the directory listing
+       bool acceptFile(std::string fileName);
+       //! Mutex for protection of the directory listing
+       std::mutex _mtx;
+       std::string _directory;
+       bool _recursive;
+       bool _keepSourceFile;
+       int64_t _minAge;
+       int64_t _maxAge;
+       int64_t _minSize;
+       int64_t _maxSize;
+       bool _ignoreHiddenFile;
+       int64_t _pollInterval;
+       int64_t _batchSize;
+       uint64_t _lastDirectoryListingTime;
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/ProcessSession.h
----------------------------------------------------------------------
diff --git a/inc/ProcessSession.h b/inc/ProcessSession.h
index d38e71b..c8ec3a5 100644
--- a/inc/ProcessSession.h
+++ b/inc/ProcessSession.h
@@ -83,7 +83,7 @@ public:
        //! Penalize the flow
        void penalize(FlowFileRecord *flow);
        //! Import the existed file into the flow
-       void import(std::string source, FlowFileRecord *flow);
+       void import(std::string source, FlowFileRecord *flow, bool keepSource = 
true, uint64_t offset = 0);
 
 protected:
        //! FlowFiles being modified by current process session

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/inc/TailFile.h
----------------------------------------------------------------------
diff --git a/inc/TailFile.h b/inc/TailFile.h
new file mode 100644
index 0000000..5c4ba09
--- /dev/null
+++ b/inc/TailFile.h
@@ -0,0 +1,93 @@
+/**
+ * @file TailFile.h
+ * TailFile class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TAIL_FILE_H__
+#define __TAIL_FILE_H__
+
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! TailFile Class
+class TailFile : public Processor
+{
+public:
+       //! Constructor
+       /*!
+        * Create a new processor
+        */
+       TailFile(std::string name, uuid_t uuid = NULL)
+       : Processor(name, uuid)
+       {
+               _logger = Logger::getLogger();
+               _stateRecovered = false;
+       }
+       //! Destructor
+       virtual ~TailFile()
+       {
+               storeState();
+       }
+       //! Processor Name
+       static const std::string ProcessorName;
+       //! Supported Properties
+       static Property FileName;
+       static Property StateFile;
+       //! Supported Relationships
+       static Relationship Success;
+
+public:
+       //! OnTrigger method, implemented by NiFi TailFile
+       virtual void onTrigger(ProcessContext *context, ProcessSession 
*session);
+       //! Initialize, over write by NiFi TailFile
+       virtual void initialize(void);
+       //! recoverState
+       void recoverState();
+       //! storeState
+       void storeState();
+
+protected:
+
+private:
+       //! Logger
+       Logger *_logger;
+       std::string _fileLocation;
+       //! Property Specified Tailed File Name
+       std::string _fileName;
+       //! File to save state
+       std::string _stateFile;
+       //! State related to the tailed file
+       std::string _currentTailFileName;
+       uint64_t _currentTailFilePosition;
+       bool _stateRecovered;
+       uint64_t _currentTailFileCreatedTime;
+       //! Utils functions for parse state file
+       std::string trimLeft(const std::string& s);
+       std::string trimRight(const std::string& s);
+       void parseStateFileLine(char *buf);
+       void checkRollOver();
+
+};
+
+//! Matched File Item for Roll over check
+typedef struct {
+       std::string fileName;
+       uint64_t modifiedTime;
+} TailMatchedFileItem;
+
+#endif

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index b18953e..7f4061e 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -140,6 +140,14 @@ Processor *FlowController::createProcessor(std::string 
name, uuid_t uuid)
        {
                processor = new RealTimeDataCollector(name, uuid);
        }
+       else if (name == GetFile::ProcessorName)
+       {
+               processor = new GetFile(name, uuid);
+       }
+       else if (name == TailFile::ProcessorName)
+       {
+               processor = new TailFile(name, uuid);
+       }
        else
        {
                _logger->log_error("No Processor defined for %s", name.c_str());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/GetFile.cpp
----------------------------------------------------------------------
diff --git a/src/GetFile.cpp b/src/GetFile.cpp
new file mode 100644
index 0000000..32503f5
--- /dev/null
+++ b/src/GetFile.cpp
@@ -0,0 +1,277 @@
+/**
+ * @file GetFile.cpp
+ * GetFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <dirent.h>
+#include <limits.h>
+#include <unistd.h>
+
+#include "TimeUtil.h"
+#include "GetFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string GetFile::ProcessorName("GetFile");
+Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull 
in each iteration", "10");
+Property GetFile::Directory("Input Directory", "The input directory from which 
to pull files", ".");
+Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether 
or not hidden files should be ignored", "true");
+Property GetFile::KeepSourceFile("Keep Source File",
+               "If true, the file is not deleted after it has been copied to 
the Content Repository", "false");
+Property GetFile::MaxAge("Maximum File Age",
+               "The minimum age that a file must be in order to be pulled; any 
file younger than this amount of time (according to last modification date) 
will be ignored", "0 sec");
+Property GetFile::MinAge("Minimum File Age",
+               "The maximum age that a file must be in order to be pulled; any 
file older than this amount of time (according to last modification date) will 
be ignored", "0 sec");
+Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file 
can be in order to be pulled", "0 B");
+Property GetFile::MinSize("Minimum File Size", "The minimum size that a file 
must be in order to be pulled", "0 B");
+Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait 
before performing a directory listing", "0 sec");
+Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not 
to pull files from subdirectories", "true");
+Relationship GetFile::Success("success", "All files are routed to success");
+
+void GetFile::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(BatchSize);
+       properties.insert(Directory);
+       properties.insert(IgnoreHiddenFile);
+       properties.insert(KeepSourceFile);
+       properties.insert(MaxAge);
+       properties.insert(MinAge);
+       properties.insert(MaxSize);
+       properties.insert(MinSize);
+       properties.insert(PollInterval);
+       properties.insert(Recurse);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string value;
+       if (context->getProperty(Directory.getName(), value))
+       {
+               _directory = value;
+       }
+       if (context->getProperty(BatchSize.getName(), value))
+       {
+               Property::StringToInt(value, _batchSize);
+       }
+       if (context->getProperty(IgnoreHiddenFile.getName(), value))
+       {
+               Property::StringToBool(value, _ignoreHiddenFile);
+       }
+       if (context->getProperty(KeepSourceFile.getName(), value))
+       {
+               Property::StringToBool(value, _keepSourceFile);
+       }
+       if (context->getProperty(MaxAge.getName(), value))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(value, _maxAge, unit) &&
+                       Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge))
+               {
+
+               }
+       }
+       if (context->getProperty(MinAge.getName(), value))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(value, _minAge, unit) &&
+                       Property::ConvertTimeUnitToMS(_minAge, unit, _minAge))
+               {
+
+               }
+       }
+       if (context->getProperty(MaxSize.getName(), value))
+       {
+               Property::StringToInt(value, _maxSize);
+       }
+       if (context->getProperty(MinSize.getName(), value))
+       {
+               Property::StringToInt(value, _minSize);
+       }
+       if (context->getProperty(PollInterval.getName(), value))
+       {
+               TimeUnit unit;
+               if (Property::StringToTime(value, _pollInterval, unit) &&
+                       Property::ConvertTimeUnitToMS(_pollInterval, unit, 
_pollInterval))
+               {
+
+               }
+       }
+       if (context->getProperty(Recurse.getName(), value))
+       {
+               Property::StringToBool(value, _recursive);
+       }
+
+       // Perform directory list
+       if (isListingEmpty())
+       {
+               if (_pollInterval == 0 || (getTimeMillis() - 
_lastDirectoryListingTime) > _pollInterval)
+               {
+                       performListing(_directory);
+               }
+       }
+
+       if (!isListingEmpty())
+       {
+               try
+               {
+                       std::queue<std::string> list;
+                       pollListing(list, _batchSize);
+                       while (!list.empty())
+                       {
+                               std::string fileName = list.front();
+                               list.pop();
+                               _logger->log_info("GetFile process %s", 
fileName.c_str());
+                               FlowFileRecord *flowFile = session->create();
+                               if (!flowFile)
+                                       return;
+                               std::size_t found = 
fileName.find_last_of("/\\");
+                               std::string path = fileName.substr(0,found);
+                               std::string name = fileName.substr(found+1);
+                               flowFile->updateAttribute(FILENAME, name);
+                               flowFile->updateAttribute(PATH, path);
+                               flowFile->addAttribute(ABSOLUTE_PATH, fileName);
+                               session->import(fileName, flowFile, 
_keepSourceFile);
+                               session->transfer(flowFile, Success);
+                       }
+               }
+               catch (std::exception &exception)
+               {
+                       _logger->log_debug("GetFile Caught Exception %s", 
exception.what());
+                       throw;
+               }
+               catch (...)
+               {
+                       throw;
+               }
+       }
+}
+
+bool GetFile::isListingEmpty()
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       return _dirList.empty();
+}
+
+void GetFile::putListing(std::string fileName)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       _dirList.push(fileName);
+}
+
+void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize))
+       {
+               std::string fileName = _dirList.front();
+               _dirList.pop();
+               list.push(fileName);
+       }
+
+       return;
+}
+
+bool GetFile::acceptFile(std::string fileName)
+{
+       struct stat statbuf;
+
+       if (stat(fileName.c_str(), &statbuf) == 0)
+       {
+               if (_minSize > 0 && statbuf.st_size <_minSize)
+                       return false;
+
+               if (_maxSize > 0 && statbuf.st_size > _maxSize)
+                       return false;
+
+               uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
+               uint64_t fileAge = getTimeMillis() - modifiedTime;
+               if (_minAge > 0 && fileAge < _minAge)
+                       return false;
+               if (_maxAge > 0 && fileAge > _maxAge)
+                       return false;
+
+               if (_ignoreHiddenFile && fileName.c_str()[0] == '.')
+                       return false;
+
+               if (access(fileName.c_str(), R_OK) != 0)
+                       return false;
+
+               if (_keepSourceFile == false && access(fileName.c_str(), W_OK) 
!= 0)
+                       return false;
+
+               return true;
+       }
+
+       return false;
+}
+
+void GetFile::performListing(std::string dir)
+{
+       DIR *d;
+       d = opendir(dir.c_str());
+       if (!d)
+               return;
+       while (1)
+       {
+               struct dirent *entry;
+               entry = readdir(d);
+               if (!entry)
+                       break;
+               std::string d_name = entry->d_name;
+               if ((entry->d_type & DT_DIR))
+               {
+                       // if this is a directory
+                       if (_recursive && strcmp(d_name.c_str(), "..") != 0 && 
strcmp(d_name.c_str(), ".") != 0)
+                       {
+                               std::string path = dir + "/" + d_name;
+                               performListing(path);
+                       }
+               }
+               else
+               {
+                       std::string fileName = dir + "/" + d_name;
+                       if (acceptFile(fileName))
+                       {
+                               // check whether we can take this file
+                               putListing(fileName);
+                       }
+               }
+       }
+       closedir(d);
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp
index 2628ae3..141d1ee 100644
--- a/src/ProcessSession.cpp
+++ b/src/ProcessSession.cpp
@@ -349,7 +349,7 @@ void ProcessSession::read(FlowFileRecord *flow, 
InputStreamCallback *callback)
        }
 }
 
-void ProcessSession::import(std::string source, FlowFileRecord *flow)
+void ProcessSession::import(std::string source, FlowFileRecord *flow, bool 
keepSource, uint64_t offset)
 {
        ResourceClaim *claim = NULL;
 
@@ -368,6 +368,7 @@ void ProcessSession::import(std::string source, 
FlowFileRecord *flow)
                if (fs.is_open() && input.is_open())
                {
                        // Open the source file and stream to the flow file
+                       input.seekg(offset, fs.beg);
                        while (input.good())
                        {
                                input.read(buf, size);
@@ -394,6 +395,8 @@ void ProcessSession::import(std::string source, 
FlowFileRecord *flow)
                                                flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
                                fs.close();
                                input.close();
+                               if (!keepSource)
+                                       std::remove(source.c_str());
                        }
                        else
                        {
@@ -610,6 +613,12 @@ void ProcessSession::commit()
                        FlowFileRecord *record = it->second;
                        delete record;
                }
+               // All done
+               _updatedFlowFiles.clear();
+               _addedFlowFiles.clear();
+               _clonedFlowFiles.clear();
+               _deletedFlowFiles.clear();
+               _originalFlowFiles.clear();
                _logger->log_trace("ProcessSession committed for %s", 
_processContext->getProcessor()->getName().c_str());
        }
        catch (std::exception &exception)
@@ -642,22 +651,27 @@ void ProcessSession::rollback()
                        else
                                delete record;
                }
+               _originalFlowFiles.clear();
                // Process the clone flow files
                for (it = _clonedFlowFiles.begin(); it!= 
_clonedFlowFiles.end(); it++)
                {
                        FlowFileRecord *record = it->second;
                        delete record;
                }
+               _clonedFlowFiles.clear();
                for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); 
it++)
                {
                        FlowFileRecord *record = it->second;
                        delete record;
                }
+               _addedFlowFiles.clear();
                for (it = _updatedFlowFiles.begin(); it!= 
_updatedFlowFiles.end(); it++)
                {
                        FlowFileRecord *record = it->second;
                        delete record;
                }
+               _updatedFlowFiles.clear();
+               _deletedFlowFiles.clear();
                _logger->log_trace("ProcessSession rollback for %s", 
_processContext->getProcessor()->getName().c_str());
        }
        catch (std::exception &exception)

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/b3848a31/src/TailFile.cpp
----------------------------------------------------------------------
diff --git a/src/TailFile.cpp b/src/TailFile.cpp
new file mode 100644
index 0000000..445255b
--- /dev/null
+++ b/src/TailFile.cpp
@@ -0,0 +1,272 @@
+/**
+ * @file TailFile.cpp
+ * TailFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <vector>
+#include <queue>
+#include <map>
+#include <set>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <time.h>
+#include <sstream>
+#include <stdio.h>
+#include <string>
+#include <iostream>
+#include <dirent.h>
+#include <limits.h>
+#include <unistd.h>
+
+#include "TimeUtil.h"
+#include "TailFile.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string TailFile::ProcessorName("TailFile");
+Property TailFile::FileName("File to Tail", "Fully-qualified filename of the 
file that should be tailed", "");
+Property TailFile::StateFile("State File",
+               "Specifies the file that should be used for storing state about 
what data has been ingested so that upon restart NiFi can resume from where it 
left off", "");
+Relationship TailFile::Success("success", "All files are routed to success");
+
+void TailFile::initialize()
+{
+       //! Set the supported properties
+       std::set<Property> properties;
+       properties.insert(FileName);
+       properties.insert(StateFile);
+       setSupportedProperties(properties);
+       //! Set the supported relationships
+       std::set<Relationship> relationships;
+       relationships.insert(Success);
+       setSupportedRelationships(relationships);
+}
+
+std::string TailFile::trimLeft(const std::string& s)
+{
+       const char *WHITESPACE = " \n\r\t";
+    size_t startpos = s.find_first_not_of(WHITESPACE);
+    return (startpos == std::string::npos) ? "" : s.substr(startpos);
+}
+
+std::string TailFile::trimRight(const std::string& s)
+{
+       const char *WHITESPACE = " \n\r\t";
+    size_t endpos = s.find_last_not_of(WHITESPACE);
+    return (endpos == std::string::npos) ? "" : s.substr(0, endpos+1);
+}
+
+void TailFile::parseStateFileLine(char *buf)
+{
+       char *line = buf;
+
+    while ((line[0] == ' ') || (line[0] =='\t'))
+       ++line;
+
+    char first = line[0];
+    if ((first == '\0') || (first == '#')  || (first == '\r') || (first == 
'\n') || (first == '='))
+    {
+       return;
+    }
+
+    char *equal = strchr(line, '=');
+    if (equal == NULL)
+    {
+       return;
+    }
+
+    equal[0] = '\0';
+    std::string key = line;
+
+    equal++;
+    while ((equal[0] == ' ') || (equal[0] == '\t'))
+       ++equal;
+
+    first = equal[0];
+    if ((first == '\0') || (first == '\r') || (first== '\n'))
+    {
+       return;
+    }
+
+    std::string value = equal;
+    key = trimRight(key);
+    value = trimRight(value);
+
+    if (key == "FILENAME")
+       this->_currentTailFileName = value;
+    if (key == "POSITION")
+       this->_currentTailFilePosition = std::stoi(value);
+
+    return;
+}
+
+void TailFile::recoverState()
+{
+       std::ifstream file(_stateFile.c_str(), std::ifstream::in);
+       if (!file.good())
+       {
+               _logger->log_error("load state file failed %s", 
_stateFile.c_str());
+               return;
+       }
+       const unsigned int bufSize = 512;
+       char buf[bufSize];
+       for (file.getline(buf,bufSize); file.good(); file.getline(buf,bufSize))
+       {
+               parseStateFileLine(buf);
+       }
+}
+
+void TailFile::storeState()
+{
+       std::ofstream file(_stateFile.c_str());
+       if (!file.is_open())
+       {
+               _logger->log_error("store state file failed %s", 
_stateFile.c_str());
+               return;
+       }
+       file << "FILENAME=" << this->_currentTailFileName << "\n";
+       file << "POSITION=" << this->_currentTailFilePosition << "\n";
+       file.close();
+}
+
+static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j)
+{
+       return (i.modifiedTime < j.modifiedTime);
+}
+void TailFile::checkRollOver()
+{
+       struct stat statbuf;
+       std::vector<TailMatchedFileItem> matchedFiles;
+       std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
+
+       if (stat(fullPath.c_str(), &statbuf) == 0)
+       {
+               if (statbuf.st_size > this->_currentTailFilePosition)
+                       // there are new input for the current tail file
+                       return;
+
+               uint64_t modifiedTimeCurrentTailFile = ((uint64_t) 
(statbuf.st_mtime) * 1000);
+               std::string pattern = _fileName;
+               std::size_t found = _fileName.find_last_of(".");
+               if (found != std::string::npos)
+                       pattern = _fileName.substr(0,found);
+               DIR *d;
+               d = opendir(this->_fileLocation.c_str());
+               if (!d)
+                       return;
+               while (1)
+               {
+                       struct dirent *entry;
+                       entry = readdir(d);
+                       if (!entry)
+                               break;
+                       std::string d_name = entry->d_name;
+                       if (!(entry->d_type & DT_DIR))
+                       {
+                               std::string fileName = d_name;
+                               std::string fileFullName = this->_fileLocation 
+ "/" + d_name;
+                               if (fileFullName.find(pattern) != 
std::string::npos && stat(fileFullName.c_str(), &statbuf) == 0)
+                               {
+                                       if (((uint64_t) (statbuf.st_mtime) * 
1000) >= modifiedTimeCurrentTailFile)
+                                       {
+                                               TailMatchedFileItem item;
+                                               item.fileName = fileName;
+                                               item.modifiedTime = ((uint64_t) 
(statbuf.st_mtime) * 1000);
+                                               matchedFiles.push_back(item);
+                                       }
+                               }
+                       }
+               }
+               closedir(d);
+
+               // Sort the list based on modified time
+               std::sort(matchedFiles.begin(), matchedFiles.end(), 
sortTailMatchedFileItem);
+               for (std::vector<TailMatchedFileItem>::iterator it = 
matchedFiles.begin(); it!=matchedFiles.end(); ++it)
+               {
+                       TailMatchedFileItem item = *it;
+                       if (item.fileName == _currentTailFileName)
+                       {
+                               ++it;
+                               if (it!=matchedFiles.end())
+                               {
+                                       TailMatchedFileItem nextItem = *it;
+                                       _logger->log_info("TailFile File Roll 
Over from %s to %s", _currentTailFileName.c_str(), nextItem.fileName.c_str());
+                                       _currentTailFileName = 
nextItem.fileName;
+                                       _currentTailFilePosition = 0;
+                                       storeState();
+                               }
+                               break;
+                       }
+               }
+       }
+       else
+               return;
+}
+
+
+void TailFile::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+       std::string value;
+       if (context->getProperty(FileName.getName(), value))
+       {
+               std::size_t found = value.find_last_of("/\\");
+               this->_fileLocation = value.substr(0,found);
+               this->_fileName = value.substr(found+1);
+       }
+       if (context->getProperty(StateFile.getName(), value))
+       {
+               _stateFile = value;
+       }
+       if (!this->_stateRecovered)
+       {
+               _stateRecovered = true;
+               this->_currentTailFileName = _fileName;
+               this->_currentTailFilePosition = 0;
+               // recover the state if we have not done so
+               this->recoverState();
+       }
+       checkRollOver();
+       std::string fullPath = this->_fileLocation + "/" + _currentTailFileName;
+       struct stat statbuf;
+       if (stat(fullPath.c_str(), &statbuf) == 0)
+       {
+               if (statbuf.st_size <= this->_currentTailFilePosition)
+                       // there are no new input for the current tail file
+               {
+                       context->yield();
+                       return;
+               }
+               FlowFileRecord *flowFile = session->create();
+               if (!flowFile)
+                       return;
+               std::size_t found = _currentTailFileName.find_last_of(".");
+               std::string baseName = _currentTailFileName.substr(0,found);
+               std::string extension = _currentTailFileName.substr(found+1);
+               flowFile->updateAttribute(PATH, _fileLocation);
+               flowFile->addAttribute(ABSOLUTE_PATH, fullPath);
+               session->import(fullPath, flowFile, true, 
this->_currentTailFilePosition);
+               session->transfer(flowFile, Success);
+               _logger->log_info("TailFile %s for %d bytes", 
_currentTailFileName.c_str(), flowFile->getSize());
+               std::string logName = baseName + "." + 
std::to_string(_currentTailFilePosition) + "-" +
+                               std::to_string(_currentTailFilePosition + 
flowFile->getSize()) + "." + extension;
+               flowFile->updateAttribute(FILENAME, logName);
+               this->_currentTailFilePosition += flowFile->getSize();
+               storeState();
+       }
+}
+

Reply via email to