Hi Joe,

Just completed by test with 100GB data (on a local RAID 5 disk on a single
server).

I was able to load 100GB data within 15 minutes(awesome!!) using below
flow. This throughput is enough to load 10TB data in a day with a single
and simple machine.
During the test, server disk I/O went up to 200MB/s.

    ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile (4
threads) > PutHDFS (4 threads)

My Next action is to incorporate my java code for column masking with a
custom processor.
I am now exploring on that. However, if you have any good reference on
custom processor(altering actual data) please let  me know.

Thanks,
Obaid



On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim <obaidc...@gmail.com> wrote:

> Hi Joe,
>
> Yes, symlink is another option I was thinking when I was trying to use
> getfile.
> Thanks for your insights, I will update you on this mail chain when my
> entire workflow completes. So that thus could be an reference for other :).
>
> -Obaid
>
> On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote:
>
>> Obaid,
>>
>> You make a great point.
>>
>> I agree we will ultimately need to do more to make that very valid
>> approach work easily.  The downside is that puts the onus on NiFi to
>> keep track of a variety of potentially quite large state about the
>> directory.  One way to avoid that expense is if NiFi can pull a copy
>> of then delete the source file.  If you'd like to keep a copy around I
>> wonder if a good approach is to simply create a symlink to the
>> original file you want NiFi to pull but have the symlink in the NiFi
>> pickup directory.  NiFi is then free to read and delete which means it
>> simply pulls whatever shows up in that directory and doesn't have to
>> keep state about filenames and checksums.
>>
>> I realize we still need to do what you're suggesting as well but
>> thought I'd run this by you.
>>
>> Joe
>>
>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim <obaidc...@gmail.com>
>> wrote:
>> > Hi Joe,
>> >
>> > Condider a scenerio, where we need to feed some older files and we are
>> using
>> > "mv" to feed files to input directory( to reduce IO we may use "mv").
>> If we
>> > use "mv", last modified date will not changed. And this is very common
>> on a
>> > busy file collection system.
>> >
>> > However, I think I can still manage it by adding additional "touch"
>> before
>> > moving fole in the target directory.
>> >
>> > So, my suggestion is to add file selection criteria as an configurable
>> > option in listfile process on workflow. Options could be last modified
>> > date(as current one) unique file names, checksum etc.
>> >
>> > Thanks again man.
>> > -Obaid
>> >
>> >
>> > On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote:
>> >>
>> >> Hello Obaid,
>> >>
>> >> The default behavior of the ListFile processor is to keep track of the
>> >> last modified time of the files it lists.  When you changed the name
>> >> of the file that doesn't change the last modified time as tracked by
>> >> the OS but when you altered content it does.  Simply 'touch' on the
>> >> file would do it too.
>> >>
>> >> I believe we could observe the last modified time of the directory in
>> >> which the file lives to detect something like a rename.  However, we'd
>> >> not know which file was renamed just that something was changed.  So
>> >> it require keeping some potentially problematic state to deconflict or
>> >> requiring the user to have a duplicate detection process afterwards.
>> >>
>> >> So with that in mind is the current behavior sufficient for your case?
>> >>
>> >> Thanks
>> >> Joe
>> >>
>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim <obaidc...@gmail.com>
>> wrote:
>> >> > Hi Joe,
>> >> >
>> >> > I am now exploring your solution.
>> >> > Starting with below flow:
>> >> >
>> >> > ListFIle > FetchFile > CompressContent > PutFile.
>> >> >
>> >> > Seems all fine. Except some confusion with how ListFile identifies
>> new
>> >> > files.
>> >> > In order to test, I renamed a already processed file and put in in
>> input
>> >> > folder and found that the file is not processing.
>> >> > Then I randomly changed the content of the file and it was
>> immediately
>> >> > processed.
>> >> >
>> >> > My question is what is the new file selection criteria for
>> "ListFile" ?
>> >> > Can
>> >> > I change it only to file name ?
>> >> >
>> >> > Thanks in advance.
>> >> >
>> >> > -Obaid
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt <joe.w...@gmail.com>
>> wrote:
>> >> >>
>> >> >> Hello Obaid,
>> >> >>
>> >> >> At 6 TB/day and average size of 2-3GB per dataset you're looking at
>> a
>> >> >> sustained rate of 70+MB/s and a pretty low transaction rate.  So
>> well
>> >> >> within a good range to work with on a single system.
>> >> >>
>> >> >> 'I's there any way to by pass writing flow files on disk or directly
>> >> >> pass those files to HDFS as it is ?"
>> >> >>
>> >> >>   There is no way to bypass NiFi taking a copy of that data by
>> design.
>> >> >> NiFi is helping you formulate a graph of dataflow requirements from
>> a
>> >> >> given source(s) through given processing steps and ultimate driving
>> >> >> data into given destination systems.  As a result it takes on the
>> >> >> challenge of handling transactionality of each interaction and the
>> >> >> buffering and backpressure to deal with the realities of different
>> >> >> production/consumption patterns.
>> >> >>
>> >> >> "If the files on the spool directory are compressed(zip/gzip), can
>> we
>> >> >> store files on HDFS as uncompressed ?"
>> >> >>
>> >> >>   Certainly.  Both of those formats (zip/gzip) are supported in NiFi
>> >> >> out of the box.  You simply run the data through the proper process
>> >> >> prior to the PutHDFS process to unpack (zip) or decompress (gzip) as
>> >> >> needed.
>> >> >>
>> >> >> "2.a Can we use our existing java code for masking ? if yes then
>> how ?
>> >> >> 2.b For this Scenario we also want to bypass storing flow files on
>> >> >> disk. Can we do it on the fly, masking and storing on HDFS ?
>> >> >> 2.c If the source files are compressed (zip/gzip), is there any
>> issue
>> >> >> for masking here ?"
>> >> >>
>> >> >>   You would build a custom NiFi processor that leverages your
>> existing
>> >> >> code.  If your code is able to operate on an InputStream and writes
>> to
>> >> >> an OutputStream then it is very likely you'll be able to handle
>> >> >> arbitrarily large objects with zero negative impact to the JVM Heap
>> as
>> >> >> well.  This is thanks to the fact that the data is present in NiFi's
>> >> >> repository with copy-on-write/pass-by-reference semantics and that
>> the
>> >> >> API is exposing those streams to your code in a transactional
>> manner.
>> >> >>
>> >> >>   If you want the process of writing to HDFS to also do
>> decompression
>> >> >> and masking in one pass you'll need to extend/alter the PutHDFS
>> >> >> process to do that.  It is probably best to implement the flow using
>> >> >> cohesive processors (grab files, decompress files, mask files, write
>> >> >> to hdfs).  Given how the repository construct in NiFi works and
>> given
>> >> >> how caching in Linux works it is very possible you'll be quite
>> >> >> surprised by the throughput you'll see.  Even then you can optimize
>> >> >> once you're sure you need to.  The other thing to keep in mind here
>> is
>> >> >> that often a flow that starts out as specific as this turns into a
>> >> >> great place to tap the stream of data to feed some new system or new
>> >> >> algorithm with a different format or protocol.  At that moment the
>> >> >> benefits become even more obvious.
>> >> >>
>> >> >> Regarding the Flume processes in NiFi and their memory usage.  NiFi
>> >> >> offers a nice hosting mechanism for the Flume processes and brings
>> >> >> some of the benefits of NiFi's UI, provenance, repository concept.
>> >> >> However, we're still largely limited to the design assumptions one
>> >> >> gets when building a Flume process and that can be quite memory
>> >> >> limiting.  We see what we have today as a great way to help people
>> >> >> transition their existing Flume flows into NiFi by leveraging their
>> >> >> existing code but would recommend working to phase the use of those
>> >> >> out in time so that you can take full benefit of what NiFi brings
>> over
>> >> >> Flume.
>> >> >>
>> >> >> Thanks
>> >> >> Joe
>> >> >>
>> >> >>
>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim <obaidc...@gmail.com>
>> >> >> wrote:
>> >> >> > Hi,
>> >> >> >
>> >> >> > I am new in Nifi and exploring it as open source ETL tool.
>> >> >> >
>> >> >> > As per my understanding, flow files are stored on local disk and
>> it
>> >> >> > contains
>> >> >> > actual data.
>> >> >> > If above is true, lets consider a below scenario:
>> >> >> >
>> >> >> > Scenario 1:
>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files
>> coming
>> >> >> > from
>> >> >> > external sources
>> >> >> > - I want to push those files to HDFS as it is without any changes
>> >> >> >
>> >> >> > Scenario 2:
>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files
>> coming
>> >> >> > from
>> >> >> > external sources
>> >> >> > - I want to mask some of the sensitive columns
>> >> >> > - Then send one copy to HDFS and another copy to Kafka
>> >> >> >
>> >> >> > Question for Scenario 1:
>> >> >> > 1.a In that case those 5-6TB data will be again written on local
>> disk
>> >> >> > as
>> >> >> > flow files and will cause double I/O. Which eventually may cause
>> >> >> > slower
>> >> >> > performance due to I/O bottleneck.
>> >> >> > Is there any way to by pass writing flow files on disk or directly
>> >> >> > pass
>> >> >> > those files to HDFS as it is ?
>> >> >> > 1.b If the files on the spool directory are compressed(zip/gzip),
>> can
>> >> >> > we
>> >> >> > store files on HDFS as uncompressed ?
>> >> >> >
>> >> >> > Question for Scenario 2:
>> >> >> > 2.a Can we use our existing java code for masking ? if yes then
>> how ?
>> >> >> > 2.b For this Scenario we also want to bypass storing flow files on
>> >> >> > disk.
>> >> >> > Can
>> >> >> > we do it on the fly, masking and storing on HDFS ?
>> >> >> > 2.c If the source files are compressed (zip/gzip), is there any
>> issue
>> >> >> > for
>> >> >> > masking here ?
>> >> >> >
>> >> >> >
>> >> >> > In fact, I tried above using flume+flume interceptors. Everything
>> >> >> > working
>> >> >> > fine with smaller files. But when source files greater that 50MB
>> >> >> > flume
>> >> >> > chocks :(.
>> >> >> > So, I am exploring options in NiFi. Hope I will get some guideline
>> >> >> > from
>> >> >> > you
>> >> >> > guys.
>> >> >> >
>> >> >> >
>> >> >> > Thanks in advance.
>> >> >> > -Obaid
>> >> >
>> >> >
>>
>

Reply via email to