Hi Joe & Others,

Thanks for all of your suggestions.

Now I am using below code:
1. Buffered reader (I tried to use NLKBufferedReader, but it requires too
many libs & Nifi failed to start. I was lost.)
2. Buffered writer
3. Using appending line end instead to concat new line

Still no performance gain. Am I doing something wrong, anything else I can
change here.

flowfile = session.write(flowfile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
    try (BufferedReader reader = new BufferedReader(new
InputStreamReader(in, charset), maxBufferSize);
        BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(out, charset));) {

if(skipHeader == true && headerExists==true) { // to skip header, do an
additional line fetch before going to next step
if(reader.ready())   reader.readLine();
} else if( skipHeader == false && headerExists == true) { // if header is
not skipped then no need to mask, just pass through
if(reader.ready())  {
writer.write(reader.readLine());
writer.write(lineEndingBuilder.toString());
}
}
// decide about empty line earlier
String line;
while ((line = reader.readLine()) != null) {
writer.write(parseLine(line, seperator, quote, escape, maskColumns));
writer.write(lineEndingBuilder.toString());
};
writer.flush();
        }
}

});


-Obaid

On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt <joe.w...@gmail.com> wrote:

> Hello
>
> So the performance went from what sounded pretty good to what sounds
> pretty problematic.  The rate now sounds like it is around 5MB/s which
> is indeed quite poor.  Building on what Bryan said there does appear
> to be some good opportunities to improve the performance.  The link he
> provided just expanded to cover the full range to look at is here [1].
>
> Couple key points to note:
> 1) Use of a buffered line oriented reader than preserves the new lines
> 2) write to a buffered writer that accepts strings and understands
> which charset you intend to write out
> 3) avoid strong concat with newline
>
> Also keep in mind you how large any single line could be because if
> they can be quite large you may need to consider the GC pressure that
> can be caused.  But let's take a look at how things are after these
> easier steps first.
>
> [1]
> https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L334-L361
>
> Thanks
> Joe
>
> On Tue, Jan 12, 2016 at 10:35 PM, Juan Sequeiros <helloj...@gmail.com>
> wrote:
> > Obaid,
> >
> > Since you mention that you will have dedicated ETL servers and assume
> they
> > will also have a decent amount of ram on them, then I would not shy away
> > from increasing your threads.
> >
> > Also in your staging directory if you do not need to keep originals, then
> > might consider GetFile and on that one use one thread.
> >
> > Hi Joe,
> >
> > Yes, I took consideration of existinh RAID and HW settings. We have 10G
> NIC
> > for all hadoop intra-connectivity and the server in question is an edge
> node
> > of our hadoop cluster.
> > In production scenario we will use dedicated ETL servers having high
> > performance(>500MB/s) local disks.
> >
> > Sharing a good news, I have successfully mask & load to HDFS 110 GB data
> > using below flow:
> >
> > ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) >
> FetchFile
> > (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads).
> >
> > * used 4 threads for masking and 1 for other because I found it is the
> > slowest component.
> >
> > However, It seems to be too slow. It was processing 2GB files in  6
> minutes.
> > It may be because of my masking algorithm(although masking algorithm is
> > pretty simple FPE with some simple twist).
> > However I want to be sure that the way I have written custom processor is
> > the most efficient way. Please below code chunk and let me know whether
> it
> > is the fastest way to process flowfiles (csv source files) which needs
> > modifications on specific columns:
> >
> > * parseLine method contains logic for masking.
> >
> >        flowfile = session.write(flowfile, new StreamCallback() {
> >         @Override
> >            public void process(InputStream in, OutputStream out) throws
> > IOException {
> >
> >         BufferedReader reader = new BufferedReader(new
> > InputStreamReader(in));
> >         String line;
> >         if(skipHeader == true && headerExists==true) { // to skip
> header, do
> > an additional line fetch before going to next step
> >         if(reader.ready())   reader.readLine();
> >         } else if( skipHeader == false && headerExists == true) { // if
> > header is not skipped then no need to mask, just pass through
> >         if(reader.ready())
>  out.write((reader.readLine()+"\n").getBytes());
> >         }
> >
> >         // decide about empty line earlier
> >         while ((line = reader.readLine()) != null) {
> >         if(line.trim().length() > 0 ) {
> >         out.write( parseLine(line, seperator, quote, escape,
> > maskColumns).getBytes() );
> >         }
> > };
> > out.flush();
> >            }
> >        });
> >
> >
> >
> >
> > Thanks in advance.
> > -Obaid
> >
> >
> > On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt <joe.w...@gmail.com> wrote:
> >>
> >> Obaid,
> >>
> >> Really happy you're seeing the performance you need.  That works out
> >> to about 110MB/s on average over that period.  Any chance you have a
> >> 1GB NIC?  If you really want to have fun with performance tuning you
> >> can use things like iostat and other commands to observe disk,
> >> network, cpu.  Something else to consider too is the potential
> >> throughput gains of multiple RAID-1 containers rather than RAID-5
> >> since NiFi can use both in parallel.  Depends on your goals/workload
> >> so just an FYI.
> >>
> >> A good reference for how to build a processor which does altering of
> >> the data (transformation) is here [1].  It is a good idea to do a
> >> quick read through that document.  Also, one of the great things you
> >> can do as well is look at existing processors.  Some good examples
> >> relevant to transformation are [2], [3], and [4] which are quite
> >> simple stream transform types. Or take a look at [5] which is a more
> >> complicated example.  You might also be excited to know that there is
> >> some really cool work done to bring various languages into NiFi which
> >> looks on track to be available in the upcoming 0.5.0 release which is
> >> NIFI-210 [6].  That will provide a really great option to quickly
> >> build transforms using languages like Groovy, JRuby, Javascript,
> >> Scala, Lua, Javascript, and Jython.
> >>
> >> [1]
> >>
> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content
> >>
> >> [2]
> >>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
> >>
> >> [3]
> >>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
> >>
> >> [4]
> >>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
> >>
> >> [5]
> >>
> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
> >>
> >> [6] https://issues.apache.org/jira/browse/NIFI-210
> >>
> >> Thanks
> >> Joe
> >>
> >> On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim <obaidc...@gmail.com>
> wrote:
> >> > Hi Joe,
> >> >
> >> > Just completed by test with 100GB data (on a local RAID 5 disk on a
> >> > single
> >> > server).
> >> >
> >> > I was able to load 100GB data within 15 minutes(awesome!!) using below
> >> > flow.
> >> > This throughput is enough to load 10TB data in a day with a single and
> >> > simple machine.
> >> > During the test, server disk I/O went up to 200MB/s.
> >> >
> >> >     ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile
> (4
> >> > threads) > PutHDFS (4 threads)
> >> >
> >> > My Next action is to incorporate my java code for column masking with
> a
> >> > custom processor.
> >> > I am now exploring on that. However, if you have any good reference on
> >> > custom processor(altering actual data) please let  me know.
> >> >
> >> > Thanks,
> >> > Obaid
> >> >
> >> >
> >> >
> >> > On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim <obaidc...@gmail.com>
> >> > wrote:
> >> >>
> >> >> Hi Joe,
> >> >>
> >> >> Yes, symlink is another option I was thinking when I was trying to
> use
> >> >> getfile.
> >> >> Thanks for your insights, I will update you on this mail chain when
> my
> >> >> entire workflow completes. So that thus could be an reference for
> other
> >> >> :).
> >> >>
> >> >> -Obaid
> >> >>
> >> >> On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote:
> >> >>>
> >> >>> Obaid,
> >> >>>
> >> >>> You make a great point.
> >> >>>
> >> >>> I agree we will ultimately need to do more to make that very valid
> >> >>> approach work easily.  The downside is that puts the onus on NiFi to
> >> >>> keep track of a variety of potentially quite large state about the
> >> >>> directory.  One way to avoid that expense is if NiFi can pull a copy
> >> >>> of then delete the source file.  If you'd like to keep a copy
> around I
> >> >>> wonder if a good approach is to simply create a symlink to the
> >> >>> original file you want NiFi to pull but have the symlink in the NiFi
> >> >>> pickup directory.  NiFi is then free to read and delete which means
> it
> >> >>> simply pulls whatever shows up in that directory and doesn't have to
> >> >>> keep state about filenames and checksums.
> >> >>>
> >> >>> I realize we still need to do what you're suggesting as well but
> >> >>> thought I'd run this by you.
> >> >>>
> >> >>> Joe
> >> >>>
> >> >>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim <obaidc...@gmail.com>
> >> >>> wrote:
> >> >>> > Hi Joe,
> >> >>> >
> >> >>> > Condider a scenerio, where we need to feed some older files and we
> >> >>> > are
> >> >>> > using
> >> >>> > "mv" to feed files to input directory( to reduce IO we may use
> >> >>> > "mv").
> >> >>> > If we
> >> >>> > use "mv", last modified date will not changed. And this is very
> >> >>> > common
> >> >>> > on a
> >> >>> > busy file collection system.
> >> >>> >
> >> >>> > However, I think I can still manage it by adding additional
> "touch"
> >> >>> > before
> >> >>> > moving fole in the target directory.
> >> >>> >
> >> >>> > So, my suggestion is to add file selection criteria as an
> >> >>> > configurable
> >> >>> > option in listfile process on workflow. Options could be last
> >> >>> > modified
> >> >>> > date(as current one) unique file names, checksum etc.
> >> >>> >
> >> >>> > Thanks again man.
> >> >>> > -Obaid
> >> >>> >
> >> >>> >
> >> >>> > On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote:
> >> >>> >>
> >> >>> >> Hello Obaid,
> >> >>> >>
> >> >>> >> The default behavior of the ListFile processor is to keep track
> of
> >> >>> >> the
> >> >>> >> last modified time of the files it lists.  When you changed the
> >> >>> >> name
> >> >>> >> of the file that doesn't change the last modified time as tracked
> >> >>> >> by
> >> >>> >> the OS but when you altered content it does.  Simply 'touch' on
> the
> >> >>> >> file would do it too.
> >> >>> >>
> >> >>> >> I believe we could observe the last modified time of the
> directory
> >> >>> >> in
> >> >>> >> which the file lives to detect something like a rename.  However,
> >> >>> >> we'd
> >> >>> >> not know which file was renamed just that something was changed.
> >> >>> >> So
> >> >>> >> it require keeping some potentially problematic state to
> deconflict
> >> >>> >> or
> >> >>> >> requiring the user to have a duplicate detection process
> >> >>> >> afterwards.
> >> >>> >>
> >> >>> >> So with that in mind is the current behavior sufficient for your
> >> >>> >> case?
> >> >>> >>
> >> >>> >> Thanks
> >> >>> >> Joe
> >> >>> >>
> >> >>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim <
> obaidc...@gmail.com>
> >> >>> >> wrote:
> >> >>> >> > Hi Joe,
> >> >>> >> >
> >> >>> >> > I am now exploring your solution.
> >> >>> >> > Starting with below flow:
> >> >>> >> >
> >> >>> >> > ListFIle > FetchFile > CompressContent > PutFile.
> >> >>> >> >
> >> >>> >> > Seems all fine. Except some confusion with how ListFile
> >> >>> >> > identifies
> >> >>> >> > new
> >> >>> >> > files.
> >> >>> >> > In order to test, I renamed a already processed file and put in
> >> >>> >> > in
> >> >>> >> > input
> >> >>> >> > folder and found that the file is not processing.
> >> >>> >> > Then I randomly changed the content of the file and it was
> >> >>> >> > immediately
> >> >>> >> > processed.
> >> >>> >> >
> >> >>> >> > My question is what is the new file selection criteria for
> >> >>> >> > "ListFile" ?
> >> >>> >> > Can
> >> >>> >> > I change it only to file name ?
> >> >>> >> >
> >> >>> >> > Thanks in advance.
> >> >>> >> >
> >> >>> >> > -Obaid
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> >
> >> >>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt <joe.w...@gmail.com>
> >> >>> >> > wrote:
> >> >>> >> >>
> >> >>> >> >> Hello Obaid,
> >> >>> >> >>
> >> >>> >> >> At 6 TB/day and average size of 2-3GB per dataset you're
> looking
> >> >>> >> >> at
> >> >>> >> >> a
> >> >>> >> >> sustained rate of 70+MB/s and a pretty low transaction rate.
> So
> >> >>> >> >> well
> >> >>> >> >> within a good range to work with on a single system.
> >> >>> >> >>
> >> >>> >> >> 'I's there any way to by pass writing flow files on disk or
> >> >>> >> >> directly
> >> >>> >> >> pass those files to HDFS as it is ?"
> >> >>> >> >>
> >> >>> >> >>   There is no way to bypass NiFi taking a copy of that data by
> >> >>> >> >> design.
> >> >>> >> >> NiFi is helping you formulate a graph of dataflow requirements
> >> >>> >> >> from
> >> >>> >> >> a
> >> >>> >> >> given source(s) through given processing steps and ultimate
> >> >>> >> >> driving
> >> >>> >> >> data into given destination systems.  As a result it takes on
> >> >>> >> >> the
> >> >>> >> >> challenge of handling transactionality of each interaction and
> >> >>> >> >> the
> >> >>> >> >> buffering and backpressure to deal with the realities of
> >> >>> >> >> different
> >> >>> >> >> production/consumption patterns.
> >> >>> >> >>
> >> >>> >> >> "If the files on the spool directory are compressed(zip/gzip),
> >> >>> >> >> can
> >> >>> >> >> we
> >> >>> >> >> store files on HDFS as uncompressed ?"
> >> >>> >> >>
> >> >>> >> >>   Certainly.  Both of those formats (zip/gzip) are supported
> in
> >> >>> >> >> NiFi
> >> >>> >> >> out of the box.  You simply run the data through the proper
> >> >>> >> >> process
> >> >>> >> >> prior to the PutHDFS process to unpack (zip) or decompress
> >> >>> >> >> (gzip)
> >> >>> >> >> as
> >> >>> >> >> needed.
> >> >>> >> >>
> >> >>> >> >> "2.a Can we use our existing java code for masking ? if yes
> then
> >> >>> >> >> how ?
> >> >>> >> >> 2.b For this Scenario we also want to bypass storing flow
> files
> >> >>> >> >> on
> >> >>> >> >> disk. Can we do it on the fly, masking and storing on HDFS ?
> >> >>> >> >> 2.c If the source files are compressed (zip/gzip), is there
> any
> >> >>> >> >> issue
> >> >>> >> >> for masking here ?"
> >> >>> >> >>
> >> >>> >> >>   You would build a custom NiFi processor that leverages your
> >> >>> >> >> existing
> >> >>> >> >> code.  If your code is able to operate on an InputStream and
> >> >>> >> >> writes
> >> >>> >> >> to
> >> >>> >> >> an OutputStream then it is very likely you'll be able to
> handle
> >> >>> >> >> arbitrarily large objects with zero negative impact to the JVM
> >> >>> >> >> Heap
> >> >>> >> >> as
> >> >>> >> >> well.  This is thanks to the fact that the data is present in
> >> >>> >> >> NiFi's
> >> >>> >> >> repository with copy-on-write/pass-by-reference semantics and
> >> >>> >> >> that
> >> >>> >> >> the
> >> >>> >> >> API is exposing those streams to your code in a transactional
> >> >>> >> >> manner.
> >> >>> >> >>
> >> >>> >> >>   If you want the process of writing to HDFS to also do
> >> >>> >> >> decompression
> >> >>> >> >> and masking in one pass you'll need to extend/alter the
> PutHDFS
> >> >>> >> >> process to do that.  It is probably best to implement the flow
> >> >>> >> >> using
> >> >>> >> >> cohesive processors (grab files, decompress files, mask files,
> >> >>> >> >> write
> >> >>> >> >> to hdfs).  Given how the repository construct in NiFi works
> and
> >> >>> >> >> given
> >> >>> >> >> how caching in Linux works it is very possible you'll be quite
> >> >>> >> >> surprised by the throughput you'll see.  Even then you can
> >> >>> >> >> optimize
> >> >>> >> >> once you're sure you need to.  The other thing to keep in mind
> >> >>> >> >> here
> >> >>> >> >> is
> >> >>> >> >> that often a flow that starts out as specific as this turns
> into
> >> >>> >> >> a
> >> >>> >> >> great place to tap the stream of data to feed some new system
> or
> >> >>> >> >> new
> >> >>> >> >> algorithm with a different format or protocol.  At that moment
> >> >>> >> >> the
> >> >>> >> >> benefits become even more obvious.
> >> >>> >> >>
> >> >>> >> >> Regarding the Flume processes in NiFi and their memory usage.
> >> >>> >> >> NiFi
> >> >>> >> >> offers a nice hosting mechanism for the Flume processes and
> >> >>> >> >> brings
> >> >>> >> >> some of the benefits of NiFi's UI, provenance, repository
> >> >>> >> >> concept.
> >> >>> >> >> However, we're still largely limited to the design assumptions
> >> >>> >> >> one
> >> >>> >> >> gets when building a Flume process and that can be quite
> memory
> >> >>> >> >> limiting.  We see what we have today as a great way to help
> >> >>> >> >> people
> >> >>> >> >> transition their existing Flume flows into NiFi by leveraging
> >> >>> >> >> their
> >> >>> >> >> existing code but would recommend working to phase the use of
> >> >>> >> >> those
> >> >>> >> >> out in time so that you can take full benefit of what NiFi
> >> >>> >> >> brings
> >> >>> >> >> over
> >> >>> >> >> Flume.
> >> >>> >> >>
> >> >>> >> >> Thanks
> >> >>> >> >> Joe
> >> >>> >> >>
> >> >>> >> >>
> >> >>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim
> >> >>> >> >> <obaidc...@gmail.com>
> >> >>> >> >> wrote:
> >> >>> >> >> > Hi,
> >> >>> >> >> >
> >> >>> >> >> > I am new in Nifi and exploring it as open source ETL tool.
> >> >>> >> >> >
> >> >>> >> >> > As per my understanding, flow files are stored on local disk
> >> >>> >> >> > and
> >> >>> >> >> > it
> >> >>> >> >> > contains
> >> >>> >> >> > actual data.
> >> >>> >> >> > If above is true, lets consider a below scenario:
> >> >>> >> >> >
> >> >>> >> >> > Scenario 1:
> >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files
> >> >>> >> >> > coming
> >> >>> >> >> > from
> >> >>> >> >> > external sources
> >> >>> >> >> > - I want to push those files to HDFS as it is without any
> >> >>> >> >> > changes
> >> >>> >> >> >
> >> >>> >> >> > Scenario 2:
> >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files
> >> >>> >> >> > coming
> >> >>> >> >> > from
> >> >>> >> >> > external sources
> >> >>> >> >> > - I want to mask some of the sensitive columns
> >> >>> >> >> > - Then send one copy to HDFS and another copy to Kafka
> >> >>> >> >> >
> >> >>> >> >> > Question for Scenario 1:
> >> >>> >> >> > 1.a In that case those 5-6TB data will be again written on
> >> >>> >> >> > local
> >> >>> >> >> > disk
> >> >>> >> >> > as
> >> >>> >> >> > flow files and will cause double I/O. Which eventually may
> >> >>> >> >> > cause
> >> >>> >> >> > slower
> >> >>> >> >> > performance due to I/O bottleneck.
> >> >>> >> >> > Is there any way to by pass writing flow files on disk or
> >> >>> >> >> > directly
> >> >>> >> >> > pass
> >> >>> >> >> > those files to HDFS as it is ?
> >> >>> >> >> > 1.b If the files on the spool directory are
> >> >>> >> >> > compressed(zip/gzip),
> >> >>> >> >> > can
> >> >>> >> >> > we
> >> >>> >> >> > store files on HDFS as uncompressed ?
> >> >>> >> >> >
> >> >>> >> >> > Question for Scenario 2:
> >> >>> >> >> > 2.a Can we use our existing java code for masking ? if yes
> >> >>> >> >> > then
> >> >>> >> >> > how ?
> >> >>> >> >> > 2.b For this Scenario we also want to bypass storing flow
> >> >>> >> >> > files
> >> >>> >> >> > on
> >> >>> >> >> > disk.
> >> >>> >> >> > Can
> >> >>> >> >> > we do it on the fly, masking and storing on HDFS ?
> >> >>> >> >> > 2.c If the source files are compressed (zip/gzip), is there
> >> >>> >> >> > any
> >> >>> >> >> > issue
> >> >>> >> >> > for
> >> >>> >> >> > masking here ?
> >> >>> >> >> >
> >> >>> >> >> >
> >> >>> >> >> > In fact, I tried above using flume+flume interceptors.
> >> >>> >> >> > Everything
> >> >>> >> >> > working
> >> >>> >> >> > fine with smaller files. But when source files greater that
> >> >>> >> >> > 50MB
> >> >>> >> >> > flume
> >> >>> >> >> > chocks :(.
> >> >>> >> >> > So, I am exploring options in NiFi. Hope I will get some
> >> >>> >> >> > guideline
> >> >>> >> >> > from
> >> >>> >> >> > you
> >> >>> >> >> > guys.
> >> >>> >> >> >
> >> >>> >> >> >
> >> >>> >> >> > Thanks in advance.
> >> >>> >> >> > -Obaid
> >> >>> >> >
> >> >>> >> >
> >> >
> >> >
> >
> >
>

Reply via email to