Obaid,

I can't say for sure how much this would improve performance, but you might
want to wrap the OutputStream with BufferedOutputStream or BufferedWriter.
Would be curious to here if that helps.

A similar scenario from the standard processors is ReplaceText, here is one
example where it uses the StreamCallback:
https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L337

-Bryan

On Tue, Jan 12, 2016 at 8:38 PM, obaidul karim <obaidc...@gmail.com> wrote:

> Hi Joe,
>
> Yes, I took consideration of existinh RAID and HW settings. We have 10G
> NIC for all hadoop intra-connectivity and the server in question is an edge
> node of our hadoop cluster.
> In production scenario we will use dedicated ETL servers having high
> performance(>500MB/s) local disks.
>
> Sharing a good news, I have successfully mask & load to HDFS 110 GB data
> using below flow:
>
> ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) >
> FetchFile (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads).
>
> * used 4 threads for masking and 1 for other because I found it is the
> slowest component.
>
> However, It seems to be too slow. It was processing 2GB files in  6
> minutes. It may be because of my masking algorithm(although masking
> algorithm is pretty simple FPE with some simple twist).
> However I want to be sure that the way I have written custom processor is
> the most efficient way. Please below code chunk and let me know whether it
> is the fastest way to process flowfiles (csv source files) which needs
> modifications on specific columns:
>
> * parseLine method contains logic for masking.
>
>        flowfile = session.write(flowfile, new StreamCallback() {
>         @Override
>            public void process(InputStream in, OutputStream out) throws
> IOException {
>
>         BufferedReader reader = new BufferedReader(new
> InputStreamReader(in));
>         String line;
>         if(skipHeader == true && headerExists==true) { // to skip header,
> do an additional line fetch before going to next step
>         if(reader.ready())   reader.readLine();
>         } else if( skipHeader == false && headerExists == true) { // if
> header is not skipped then no need to mask, just pass through
>         if(reader.ready())
> out.write((reader.readLine()+"\n").getBytes());
>         }
>
>         // decide about empty line earlier
>         while ((line = reader.readLine()) != null) {
>         if(line.trim().length() > 0 ) {
>         out.write( parseLine(line, seperator, quote, escape,
> maskColumns).getBytes() );
>         }
> };
> out.flush();
>            }
>        });
>
>
>
>
> Thanks in advance.
> -Obaid
>
>
> On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt <joe.w...@gmail.com> wrote:
>
>> Obaid,
>>
>> Really happy you're seeing the performance you need.  That works out
>> to about 110MB/s on average over that period.  Any chance you have a
>> 1GB NIC?  If you really want to have fun with performance tuning you
>> can use things like iostat and other commands to observe disk,
>> network, cpu.  Something else to consider too is the potential
>> throughput gains of multiple RAID-1 containers rather than RAID-5
>> since NiFi can use both in parallel.  Depends on your goals/workload
>> so just an FYI.
>>
>> A good reference for how to build a processor which does altering of
>> the data (transformation) is here [1].  It is a good idea to do a
>> quick read through that document.  Also, one of the great things you
>> can do as well is look at existing processors.  Some good examples
>> relevant to transformation are [2], [3], and [4] which are quite
>> simple stream transform types. Or take a look at [5] which is a more
>> complicated example.  You might also be excited to know that there is
>> some really cool work done to bring various languages into NiFi which
>> looks on track to be available in the upcoming 0.5.0 release which is
>> NIFI-210 [6].  That will provide a really great option to quickly
>> build transforms using languages like Groovy, JRuby, Javascript,
>> Scala, Lua, Javascript, and Jython.
>>
>> [1]
>> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content
>>
>> [2]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
>>
>> [3]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
>>
>> [4]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
>>
>> [5]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
>>
>> [6] https://issues.apache.org/jira/browse/NIFI-210
>>
>> Thanks
>> Joe
>>
>> On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim <obaidc...@gmail.com>
>> wrote:
>> > Hi Joe,
>> >
>> > Just completed by test with 100GB data (on a local RAID 5 disk on a
>> single
>> > server).
>> >
>> > I was able to load 100GB data within 15 minutes(awesome!!) using below
>> flow.
>> > This throughput is enough to load 10TB data in a day with a single and
>> > simple machine.
>> > During the test, server disk I/O went up to 200MB/s.
>> >
>> >     ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile (4
>> > threads) > PutHDFS (4 threads)
>> >
>> > My Next action is to incorporate my java code for column masking with a
>> > custom processor.
>> > I am now exploring on that. However, if you have any good reference on
>> > custom processor(altering actual data) please let  me know.
>> >
>> > Thanks,
>> > Obaid
>> >
>> >
>> >
>> > On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim <obaidc...@gmail.com>
>> wrote:
>> >>
>> >> Hi Joe,
>> >>
>> >> Yes, symlink is another option I was thinking when I was trying to use
>> >> getfile.
>> >> Thanks for your insights, I will update you on this mail chain when my
>> >> entire workflow completes. So that thus could be an reference for
>> other :).
>> >>
>> >> -Obaid
>> >>
>> >> On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote:
>> >>>
>> >>> Obaid,
>> >>>
>> >>> You make a great point.
>> >>>
>> >>> I agree we will ultimately need to do more to make that very valid
>> >>> approach work easily.  The downside is that puts the onus on NiFi to
>> >>> keep track of a variety of potentially quite large state about the
>> >>> directory.  One way to avoid that expense is if NiFi can pull a copy
>> >>> of then delete the source file.  If you'd like to keep a copy around I
>> >>> wonder if a good approach is to simply create a symlink to the
>> >>> original file you want NiFi to pull but have the symlink in the NiFi
>> >>> pickup directory.  NiFi is then free to read and delete which means it
>> >>> simply pulls whatever shows up in that directory and doesn't have to
>> >>> keep state about filenames and checksums.
>> >>>
>> >>> I realize we still need to do what you're suggesting as well but
>> >>> thought I'd run this by you.
>> >>>
>> >>> Joe
>> >>>
>> >>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim <obaidc...@gmail.com>
>> >>> wrote:
>> >>> > Hi Joe,
>> >>> >
>> >>> > Condider a scenerio, where we need to feed some older files and we
>> are
>> >>> > using
>> >>> > "mv" to feed files to input directory( to reduce IO we may use
>> "mv").
>> >>> > If we
>> >>> > use "mv", last modified date will not changed. And this is very
>> common
>> >>> > on a
>> >>> > busy file collection system.
>> >>> >
>> >>> > However, I think I can still manage it by adding additional "touch"
>> >>> > before
>> >>> > moving fole in the target directory.
>> >>> >
>> >>> > So, my suggestion is to add file selection criteria as an
>> configurable
>> >>> > option in listfile process on workflow. Options could be last
>> modified
>> >>> > date(as current one) unique file names, checksum etc.
>> >>> >
>> >>> > Thanks again man.
>> >>> > -Obaid
>> >>> >
>> >>> >
>> >>> > On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote:
>> >>> >>
>> >>> >> Hello Obaid,
>> >>> >>
>> >>> >> The default behavior of the ListFile processor is to keep track of
>> the
>> >>> >> last modified time of the files it lists.  When you changed the
>> name
>> >>> >> of the file that doesn't change the last modified time as tracked
>> by
>> >>> >> the OS but when you altered content it does.  Simply 'touch' on the
>> >>> >> file would do it too.
>> >>> >>
>> >>> >> I believe we could observe the last modified time of the directory
>> in
>> >>> >> which the file lives to detect something like a rename.  However,
>> we'd
>> >>> >> not know which file was renamed just that something was changed.
>> So
>> >>> >> it require keeping some potentially problematic state to
>> deconflict or
>> >>> >> requiring the user to have a duplicate detection process
>> afterwards.
>> >>> >>
>> >>> >> So with that in mind is the current behavior sufficient for your
>> case?
>> >>> >>
>> >>> >> Thanks
>> >>> >> Joe
>> >>> >>
>> >>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim <obaidc...@gmail.com
>> >
>> >>> >> wrote:
>> >>> >> > Hi Joe,
>> >>> >> >
>> >>> >> > I am now exploring your solution.
>> >>> >> > Starting with below flow:
>> >>> >> >
>> >>> >> > ListFIle > FetchFile > CompressContent > PutFile.
>> >>> >> >
>> >>> >> > Seems all fine. Except some confusion with how ListFile
>> identifies
>> >>> >> > new
>> >>> >> > files.
>> >>> >> > In order to test, I renamed a already processed file and put in
>> in
>> >>> >> > input
>> >>> >> > folder and found that the file is not processing.
>> >>> >> > Then I randomly changed the content of the file and it was
>> >>> >> > immediately
>> >>> >> > processed.
>> >>> >> >
>> >>> >> > My question is what is the new file selection criteria for
>> >>> >> > "ListFile" ?
>> >>> >> > Can
>> >>> >> > I change it only to file name ?
>> >>> >> >
>> >>> >> > Thanks in advance.
>> >>> >> >
>> >>> >> > -Obaid
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt <joe.w...@gmail.com>
>> >>> >> > wrote:
>> >>> >> >>
>> >>> >> >> Hello Obaid,
>> >>> >> >>
>> >>> >> >> At 6 TB/day and average size of 2-3GB per dataset you're
>> looking at
>> >>> >> >> a
>> >>> >> >> sustained rate of 70+MB/s and a pretty low transaction rate.  So
>> >>> >> >> well
>> >>> >> >> within a good range to work with on a single system.
>> >>> >> >>
>> >>> >> >> 'I's there any way to by pass writing flow files on disk or
>> >>> >> >> directly
>> >>> >> >> pass those files to HDFS as it is ?"
>> >>> >> >>
>> >>> >> >>   There is no way to bypass NiFi taking a copy of that data by
>> >>> >> >> design.
>> >>> >> >> NiFi is helping you formulate a graph of dataflow requirements
>> from
>> >>> >> >> a
>> >>> >> >> given source(s) through given processing steps and ultimate
>> driving
>> >>> >> >> data into given destination systems.  As a result it takes on
>> the
>> >>> >> >> challenge of handling transactionality of each interaction and
>> the
>> >>> >> >> buffering and backpressure to deal with the realities of
>> different
>> >>> >> >> production/consumption patterns.
>> >>> >> >>
>> >>> >> >> "If the files on the spool directory are compressed(zip/gzip),
>> can
>> >>> >> >> we
>> >>> >> >> store files on HDFS as uncompressed ?"
>> >>> >> >>
>> >>> >> >>   Certainly.  Both of those formats (zip/gzip) are supported in
>> >>> >> >> NiFi
>> >>> >> >> out of the box.  You simply run the data through the proper
>> process
>> >>> >> >> prior to the PutHDFS process to unpack (zip) or decompress
>> (gzip)
>> >>> >> >> as
>> >>> >> >> needed.
>> >>> >> >>
>> >>> >> >> "2.a Can we use our existing java code for masking ? if yes then
>> >>> >> >> how ?
>> >>> >> >> 2.b For this Scenario we also want to bypass storing flow files
>> on
>> >>> >> >> disk. Can we do it on the fly, masking and storing on HDFS ?
>> >>> >> >> 2.c If the source files are compressed (zip/gzip), is there any
>> >>> >> >> issue
>> >>> >> >> for masking here ?"
>> >>> >> >>
>> >>> >> >>   You would build a custom NiFi processor that leverages your
>> >>> >> >> existing
>> >>> >> >> code.  If your code is able to operate on an InputStream and
>> writes
>> >>> >> >> to
>> >>> >> >> an OutputStream then it is very likely you'll be able to handle
>> >>> >> >> arbitrarily large objects with zero negative impact to the JVM
>> Heap
>> >>> >> >> as
>> >>> >> >> well.  This is thanks to the fact that the data is present in
>> >>> >> >> NiFi's
>> >>> >> >> repository with copy-on-write/pass-by-reference semantics and
>> that
>> >>> >> >> the
>> >>> >> >> API is exposing those streams to your code in a transactional
>> >>> >> >> manner.
>> >>> >> >>
>> >>> >> >>   If you want the process of writing to HDFS to also do
>> >>> >> >> decompression
>> >>> >> >> and masking in one pass you'll need to extend/alter the PutHDFS
>> >>> >> >> process to do that.  It is probably best to implement the flow
>> >>> >> >> using
>> >>> >> >> cohesive processors (grab files, decompress files, mask files,
>> >>> >> >> write
>> >>> >> >> to hdfs).  Given how the repository construct in NiFi works and
>> >>> >> >> given
>> >>> >> >> how caching in Linux works it is very possible you'll be quite
>> >>> >> >> surprised by the throughput you'll see.  Even then you can
>> optimize
>> >>> >> >> once you're sure you need to.  The other thing to keep in mind
>> here
>> >>> >> >> is
>> >>> >> >> that often a flow that starts out as specific as this turns
>> into a
>> >>> >> >> great place to tap the stream of data to feed some new system or
>> >>> >> >> new
>> >>> >> >> algorithm with a different format or protocol.  At that moment
>> the
>> >>> >> >> benefits become even more obvious.
>> >>> >> >>
>> >>> >> >> Regarding the Flume processes in NiFi and their memory usage.
>> NiFi
>> >>> >> >> offers a nice hosting mechanism for the Flume processes and
>> brings
>> >>> >> >> some of the benefits of NiFi's UI, provenance, repository
>> concept.
>> >>> >> >> However, we're still largely limited to the design assumptions
>> one
>> >>> >> >> gets when building a Flume process and that can be quite memory
>> >>> >> >> limiting.  We see what we have today as a great way to help
>> people
>> >>> >> >> transition their existing Flume flows into NiFi by leveraging
>> their
>> >>> >> >> existing code but would recommend working to phase the use of
>> those
>> >>> >> >> out in time so that you can take full benefit of what NiFi
>> brings
>> >>> >> >> over
>> >>> >> >> Flume.
>> >>> >> >>
>> >>> >> >> Thanks
>> >>> >> >> Joe
>> >>> >> >>
>> >>> >> >>
>> >>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim <
>> obaidc...@gmail.com>
>> >>> >> >> wrote:
>> >>> >> >> > Hi,
>> >>> >> >> >
>> >>> >> >> > I am new in Nifi and exploring it as open source ETL tool.
>> >>> >> >> >
>> >>> >> >> > As per my understanding, flow files are stored on local disk
>> and
>> >>> >> >> > it
>> >>> >> >> > contains
>> >>> >> >> > actual data.
>> >>> >> >> > If above is true, lets consider a below scenario:
>> >>> >> >> >
>> >>> >> >> > Scenario 1:
>> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files
>> >>> >> >> > coming
>> >>> >> >> > from
>> >>> >> >> > external sources
>> >>> >> >> > - I want to push those files to HDFS as it is without any
>> changes
>> >>> >> >> >
>> >>> >> >> > Scenario 2:
>> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files
>> >>> >> >> > coming
>> >>> >> >> > from
>> >>> >> >> > external sources
>> >>> >> >> > - I want to mask some of the sensitive columns
>> >>> >> >> > - Then send one copy to HDFS and another copy to Kafka
>> >>> >> >> >
>> >>> >> >> > Question for Scenario 1:
>> >>> >> >> > 1.a In that case those 5-6TB data will be again written on
>> local
>> >>> >> >> > disk
>> >>> >> >> > as
>> >>> >> >> > flow files and will cause double I/O. Which eventually may
>> cause
>> >>> >> >> > slower
>> >>> >> >> > performance due to I/O bottleneck.
>> >>> >> >> > Is there any way to by pass writing flow files on disk or
>> >>> >> >> > directly
>> >>> >> >> > pass
>> >>> >> >> > those files to HDFS as it is ?
>> >>> >> >> > 1.b If the files on the spool directory are
>> compressed(zip/gzip),
>> >>> >> >> > can
>> >>> >> >> > we
>> >>> >> >> > store files on HDFS as uncompressed ?
>> >>> >> >> >
>> >>> >> >> > Question for Scenario 2:
>> >>> >> >> > 2.a Can we use our existing java code for masking ? if yes
>> then
>> >>> >> >> > how ?
>> >>> >> >> > 2.b For this Scenario we also want to bypass storing flow
>> files
>> >>> >> >> > on
>> >>> >> >> > disk.
>> >>> >> >> > Can
>> >>> >> >> > we do it on the fly, masking and storing on HDFS ?
>> >>> >> >> > 2.c If the source files are compressed (zip/gzip), is there
>> any
>> >>> >> >> > issue
>> >>> >> >> > for
>> >>> >> >> > masking here ?
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> > In fact, I tried above using flume+flume interceptors.
>> Everything
>> >>> >> >> > working
>> >>> >> >> > fine with smaller files. But when source files greater that
>> 50MB
>> >>> >> >> > flume
>> >>> >> >> > chocks :(.
>> >>> >> >> > So, I am exploring options in NiFi. Hope I will get some
>> >>> >> >> > guideline
>> >>> >> >> > from
>> >>> >> >> > you
>> >>> >> >> > guys.
>> >>> >> >> >
>> >>> >> >> >
>> >>> >> >> > Thanks in advance.
>> >>> >> >> > -Obaid
>> >>> >> >
>> >>> >> >
>> >
>> >
>>
>
>

Reply via email to