Hi Joe & Others, Thanks for all of your suggestions.
Now I am using below code: 1. Buffered reader (I tried to use NLKBufferedReader, but it requires too many libs & Nifi failed to start. I was lost.) 2. Buffered writer 3. Using appending line end instead to concat new line Still no performance gain. Am I doing something wrong, anything else I can change here. flowfile = session.write(flowfile, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { try (BufferedReader reader = new BufferedReader(new InputStreamReader(in, charset), maxBufferSize); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, charset));) { if(skipHeader == true && headerExists==true) { // to skip header, do an additional line fetch before going to next step if(reader.ready()) reader.readLine(); } else if( skipHeader == false && headerExists == true) { // if header is not skipped then no need to mask, just pass through if(reader.ready()) { writer.write(reader.readLine()); writer.write(lineEndingBuilder.toString()); } } // decide about empty line earlier String line; while ((line = reader.readLine()) != null) { writer.write(parseLine(line, seperator, quote, escape, maskColumns)); writer.write(lineEndingBuilder.toString()); }; writer.flush(); } } }); -Obaid On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt <joe.w...@gmail.com> wrote: > Hello > > So the performance went from what sounded pretty good to what sounds > pretty problematic. The rate now sounds like it is around 5MB/s which > is indeed quite poor. Building on what Bryan said there does appear > to be some good opportunities to improve the performance. The link he > provided just expanded to cover the full range to look at is here [1]. > > Couple key points to note: > 1) Use of a buffered line oriented reader than preserves the new lines > 2) write to a buffered writer that accepts strings and understands > which charset you intend to write out > 3) avoid strong concat with newline > > Also keep in mind you how large any single line could be because if > they can be quite large you may need to consider the GC pressure that > can be caused. But let's take a look at how things are after these > easier steps first. > > [1] > https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L334-L361 > > Thanks > Joe > > On Tue, Jan 12, 2016 at 10:35 PM, Juan Sequeiros <helloj...@gmail.com> > wrote: > > Obaid, > > > > Since you mention that you will have dedicated ETL servers and assume > they > > will also have a decent amount of ram on them, then I would not shy away > > from increasing your threads. > > > > Also in your staging directory if you do not need to keep originals, then > > might consider GetFile and on that one use one thread. > > > > Hi Joe, > > > > Yes, I took consideration of existinh RAID and HW settings. We have 10G > NIC > > for all hadoop intra-connectivity and the server in question is an edge > node > > of our hadoop cluster. > > In production scenario we will use dedicated ETL servers having high > > performance(>500MB/s) local disks. > > > > Sharing a good news, I have successfully mask & load to HDFS 110 GB data > > using below flow: > > > > ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > > FetchFile > > (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads). > > > > * used 4 threads for masking and 1 for other because I found it is the > > slowest component. > > > > However, It seems to be too slow. It was processing 2GB files in 6 > minutes. > > It may be because of my masking algorithm(although masking algorithm is > > pretty simple FPE with some simple twist). > > However I want to be sure that the way I have written custom processor is > > the most efficient way. Please below code chunk and let me know whether > it > > is the fastest way to process flowfiles (csv source files) which needs > > modifications on specific columns: > > > > * parseLine method contains logic for masking. > > > > flowfile = session.write(flowfile, new StreamCallback() { > > @Override > > public void process(InputStream in, OutputStream out) throws > > IOException { > > > > BufferedReader reader = new BufferedReader(new > > InputStreamReader(in)); > > String line; > > if(skipHeader == true && headerExists==true) { // to skip > header, do > > an additional line fetch before going to next step > > if(reader.ready()) reader.readLine(); > > } else if( skipHeader == false && headerExists == true) { // if > > header is not skipped then no need to mask, just pass through > > if(reader.ready()) > out.write((reader.readLine()+"\n").getBytes()); > > } > > > > // decide about empty line earlier > > while ((line = reader.readLine()) != null) { > > if(line.trim().length() > 0 ) { > > out.write( parseLine(line, seperator, quote, escape, > > maskColumns).getBytes() ); > > } > > }; > > out.flush(); > > } > > }); > > > > > > > > > > Thanks in advance. > > -Obaid > > > > > > On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt <joe.w...@gmail.com> wrote: > >> > >> Obaid, > >> > >> Really happy you're seeing the performance you need. That works out > >> to about 110MB/s on average over that period. Any chance you have a > >> 1GB NIC? If you really want to have fun with performance tuning you > >> can use things like iostat and other commands to observe disk, > >> network, cpu. Something else to consider too is the potential > >> throughput gains of multiple RAID-1 containers rather than RAID-5 > >> since NiFi can use both in parallel. Depends on your goals/workload > >> so just an FYI. > >> > >> A good reference for how to build a processor which does altering of > >> the data (transformation) is here [1]. It is a good idea to do a > >> quick read through that document. Also, one of the great things you > >> can do as well is look at existing processors. Some good examples > >> relevant to transformation are [2], [3], and [4] which are quite > >> simple stream transform types. Or take a look at [5] which is a more > >> complicated example. You might also be excited to know that there is > >> some really cool work done to bring various languages into NiFi which > >> looks on track to be available in the upcoming 0.5.0 release which is > >> NIFI-210 [6]. That will provide a really great option to quickly > >> build transforms using languages like Groovy, JRuby, Javascript, > >> Scala, Lua, Javascript, and Jython. > >> > >> [1] > >> > https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content > >> > >> [2] > >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java > >> > >> [3] > >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java > >> > >> [4] > >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java > >> > >> [5] > >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java > >> > >> [6] https://issues.apache.org/jira/browse/NIFI-210 > >> > >> Thanks > >> Joe > >> > >> On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim <obaidc...@gmail.com> > wrote: > >> > Hi Joe, > >> > > >> > Just completed by test with 100GB data (on a local RAID 5 disk on a > >> > single > >> > server). > >> > > >> > I was able to load 100GB data within 15 minutes(awesome!!) using below > >> > flow. > >> > This throughput is enough to load 10TB data in a day with a single and > >> > simple machine. > >> > During the test, server disk I/O went up to 200MB/s. > >> > > >> > ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile > (4 > >> > threads) > PutHDFS (4 threads) > >> > > >> > My Next action is to incorporate my java code for column masking with > a > >> > custom processor. > >> > I am now exploring on that. However, if you have any good reference on > >> > custom processor(altering actual data) please let me know. > >> > > >> > Thanks, > >> > Obaid > >> > > >> > > >> > > >> > On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim <obaidc...@gmail.com> > >> > wrote: > >> >> > >> >> Hi Joe, > >> >> > >> >> Yes, symlink is another option I was thinking when I was trying to > use > >> >> getfile. > >> >> Thanks for your insights, I will update you on this mail chain when > my > >> >> entire workflow completes. So that thus could be an reference for > other > >> >> :). > >> >> > >> >> -Obaid > >> >> > >> >> On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote: > >> >>> > >> >>> Obaid, > >> >>> > >> >>> You make a great point. > >> >>> > >> >>> I agree we will ultimately need to do more to make that very valid > >> >>> approach work easily. The downside is that puts the onus on NiFi to > >> >>> keep track of a variety of potentially quite large state about the > >> >>> directory. One way to avoid that expense is if NiFi can pull a copy > >> >>> of then delete the source file. If you'd like to keep a copy > around I > >> >>> wonder if a good approach is to simply create a symlink to the > >> >>> original file you want NiFi to pull but have the symlink in the NiFi > >> >>> pickup directory. NiFi is then free to read and delete which means > it > >> >>> simply pulls whatever shows up in that directory and doesn't have to > >> >>> keep state about filenames and checksums. > >> >>> > >> >>> I realize we still need to do what you're suggesting as well but > >> >>> thought I'd run this by you. > >> >>> > >> >>> Joe > >> >>> > >> >>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim <obaidc...@gmail.com> > >> >>> wrote: > >> >>> > Hi Joe, > >> >>> > > >> >>> > Condider a scenerio, where we need to feed some older files and we > >> >>> > are > >> >>> > using > >> >>> > "mv" to feed files to input directory( to reduce IO we may use > >> >>> > "mv"). > >> >>> > If we > >> >>> > use "mv", last modified date will not changed. And this is very > >> >>> > common > >> >>> > on a > >> >>> > busy file collection system. > >> >>> > > >> >>> > However, I think I can still manage it by adding additional > "touch" > >> >>> > before > >> >>> > moving fole in the target directory. > >> >>> > > >> >>> > So, my suggestion is to add file selection criteria as an > >> >>> > configurable > >> >>> > option in listfile process on workflow. Options could be last > >> >>> > modified > >> >>> > date(as current one) unique file names, checksum etc. > >> >>> > > >> >>> > Thanks again man. > >> >>> > -Obaid > >> >>> > > >> >>> > > >> >>> > On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote: > >> >>> >> > >> >>> >> Hello Obaid, > >> >>> >> > >> >>> >> The default behavior of the ListFile processor is to keep track > of > >> >>> >> the > >> >>> >> last modified time of the files it lists. When you changed the > >> >>> >> name > >> >>> >> of the file that doesn't change the last modified time as tracked > >> >>> >> by > >> >>> >> the OS but when you altered content it does. Simply 'touch' on > the > >> >>> >> file would do it too. > >> >>> >> > >> >>> >> I believe we could observe the last modified time of the > directory > >> >>> >> in > >> >>> >> which the file lives to detect something like a rename. However, > >> >>> >> we'd > >> >>> >> not know which file was renamed just that something was changed. > >> >>> >> So > >> >>> >> it require keeping some potentially problematic state to > deconflict > >> >>> >> or > >> >>> >> requiring the user to have a duplicate detection process > >> >>> >> afterwards. > >> >>> >> > >> >>> >> So with that in mind is the current behavior sufficient for your > >> >>> >> case? > >> >>> >> > >> >>> >> Thanks > >> >>> >> Joe > >> >>> >> > >> >>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim < > obaidc...@gmail.com> > >> >>> >> wrote: > >> >>> >> > Hi Joe, > >> >>> >> > > >> >>> >> > I am now exploring your solution. > >> >>> >> > Starting with below flow: > >> >>> >> > > >> >>> >> > ListFIle > FetchFile > CompressContent > PutFile. > >> >>> >> > > >> >>> >> > Seems all fine. Except some confusion with how ListFile > >> >>> >> > identifies > >> >>> >> > new > >> >>> >> > files. > >> >>> >> > In order to test, I renamed a already processed file and put in > >> >>> >> > in > >> >>> >> > input > >> >>> >> > folder and found that the file is not processing. > >> >>> >> > Then I randomly changed the content of the file and it was > >> >>> >> > immediately > >> >>> >> > processed. > >> >>> >> > > >> >>> >> > My question is what is the new file selection criteria for > >> >>> >> > "ListFile" ? > >> >>> >> > Can > >> >>> >> > I change it only to file name ? > >> >>> >> > > >> >>> >> > Thanks in advance. > >> >>> >> > > >> >>> >> > -Obaid > >> >>> >> > > >> >>> >> > > >> >>> >> > > >> >>> >> > > >> >>> >> > > >> >>> >> > > >> >>> >> > > >> >>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt <joe.w...@gmail.com> > >> >>> >> > wrote: > >> >>> >> >> > >> >>> >> >> Hello Obaid, > >> >>> >> >> > >> >>> >> >> At 6 TB/day and average size of 2-3GB per dataset you're > looking > >> >>> >> >> at > >> >>> >> >> a > >> >>> >> >> sustained rate of 70+MB/s and a pretty low transaction rate. > So > >> >>> >> >> well > >> >>> >> >> within a good range to work with on a single system. > >> >>> >> >> > >> >>> >> >> 'I's there any way to by pass writing flow files on disk or > >> >>> >> >> directly > >> >>> >> >> pass those files to HDFS as it is ?" > >> >>> >> >> > >> >>> >> >> There is no way to bypass NiFi taking a copy of that data by > >> >>> >> >> design. > >> >>> >> >> NiFi is helping you formulate a graph of dataflow requirements > >> >>> >> >> from > >> >>> >> >> a > >> >>> >> >> given source(s) through given processing steps and ultimate > >> >>> >> >> driving > >> >>> >> >> data into given destination systems. As a result it takes on > >> >>> >> >> the > >> >>> >> >> challenge of handling transactionality of each interaction and > >> >>> >> >> the > >> >>> >> >> buffering and backpressure to deal with the realities of > >> >>> >> >> different > >> >>> >> >> production/consumption patterns. > >> >>> >> >> > >> >>> >> >> "If the files on the spool directory are compressed(zip/gzip), > >> >>> >> >> can > >> >>> >> >> we > >> >>> >> >> store files on HDFS as uncompressed ?" > >> >>> >> >> > >> >>> >> >> Certainly. Both of those formats (zip/gzip) are supported > in > >> >>> >> >> NiFi > >> >>> >> >> out of the box. You simply run the data through the proper > >> >>> >> >> process > >> >>> >> >> prior to the PutHDFS process to unpack (zip) or decompress > >> >>> >> >> (gzip) > >> >>> >> >> as > >> >>> >> >> needed. > >> >>> >> >> > >> >>> >> >> "2.a Can we use our existing java code for masking ? if yes > then > >> >>> >> >> how ? > >> >>> >> >> 2.b For this Scenario we also want to bypass storing flow > files > >> >>> >> >> on > >> >>> >> >> disk. Can we do it on the fly, masking and storing on HDFS ? > >> >>> >> >> 2.c If the source files are compressed (zip/gzip), is there > any > >> >>> >> >> issue > >> >>> >> >> for masking here ?" > >> >>> >> >> > >> >>> >> >> You would build a custom NiFi processor that leverages your > >> >>> >> >> existing > >> >>> >> >> code. If your code is able to operate on an InputStream and > >> >>> >> >> writes > >> >>> >> >> to > >> >>> >> >> an OutputStream then it is very likely you'll be able to > handle > >> >>> >> >> arbitrarily large objects with zero negative impact to the JVM > >> >>> >> >> Heap > >> >>> >> >> as > >> >>> >> >> well. This is thanks to the fact that the data is present in > >> >>> >> >> NiFi's > >> >>> >> >> repository with copy-on-write/pass-by-reference semantics and > >> >>> >> >> that > >> >>> >> >> the > >> >>> >> >> API is exposing those streams to your code in a transactional > >> >>> >> >> manner. > >> >>> >> >> > >> >>> >> >> If you want the process of writing to HDFS to also do > >> >>> >> >> decompression > >> >>> >> >> and masking in one pass you'll need to extend/alter the > PutHDFS > >> >>> >> >> process to do that. It is probably best to implement the flow > >> >>> >> >> using > >> >>> >> >> cohesive processors (grab files, decompress files, mask files, > >> >>> >> >> write > >> >>> >> >> to hdfs). Given how the repository construct in NiFi works > and > >> >>> >> >> given > >> >>> >> >> how caching in Linux works it is very possible you'll be quite > >> >>> >> >> surprised by the throughput you'll see. Even then you can > >> >>> >> >> optimize > >> >>> >> >> once you're sure you need to. The other thing to keep in mind > >> >>> >> >> here > >> >>> >> >> is > >> >>> >> >> that often a flow that starts out as specific as this turns > into > >> >>> >> >> a > >> >>> >> >> great place to tap the stream of data to feed some new system > or > >> >>> >> >> new > >> >>> >> >> algorithm with a different format or protocol. At that moment > >> >>> >> >> the > >> >>> >> >> benefits become even more obvious. > >> >>> >> >> > >> >>> >> >> Regarding the Flume processes in NiFi and their memory usage. > >> >>> >> >> NiFi > >> >>> >> >> offers a nice hosting mechanism for the Flume processes and > >> >>> >> >> brings > >> >>> >> >> some of the benefits of NiFi's UI, provenance, repository > >> >>> >> >> concept. > >> >>> >> >> However, we're still largely limited to the design assumptions > >> >>> >> >> one > >> >>> >> >> gets when building a Flume process and that can be quite > memory > >> >>> >> >> limiting. We see what we have today as a great way to help > >> >>> >> >> people > >> >>> >> >> transition their existing Flume flows into NiFi by leveraging > >> >>> >> >> their > >> >>> >> >> existing code but would recommend working to phase the use of > >> >>> >> >> those > >> >>> >> >> out in time so that you can take full benefit of what NiFi > >> >>> >> >> brings > >> >>> >> >> over > >> >>> >> >> Flume. > >> >>> >> >> > >> >>> >> >> Thanks > >> >>> >> >> Joe > >> >>> >> >> > >> >>> >> >> > >> >>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim > >> >>> >> >> <obaidc...@gmail.com> > >> >>> >> >> wrote: > >> >>> >> >> > Hi, > >> >>> >> >> > > >> >>> >> >> > I am new in Nifi and exploring it as open source ETL tool. > >> >>> >> >> > > >> >>> >> >> > As per my understanding, flow files are stored on local disk > >> >>> >> >> > and > >> >>> >> >> > it > >> >>> >> >> > contains > >> >>> >> >> > actual data. > >> >>> >> >> > If above is true, lets consider a below scenario: > >> >>> >> >> > > >> >>> >> >> > Scenario 1: > >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files > >> >>> >> >> > coming > >> >>> >> >> > from > >> >>> >> >> > external sources > >> >>> >> >> > - I want to push those files to HDFS as it is without any > >> >>> >> >> > changes > >> >>> >> >> > > >> >>> >> >> > Scenario 2: > >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files > >> >>> >> >> > coming > >> >>> >> >> > from > >> >>> >> >> > external sources > >> >>> >> >> > - I want to mask some of the sensitive columns > >> >>> >> >> > - Then send one copy to HDFS and another copy to Kafka > >> >>> >> >> > > >> >>> >> >> > Question for Scenario 1: > >> >>> >> >> > 1.a In that case those 5-6TB data will be again written on > >> >>> >> >> > local > >> >>> >> >> > disk > >> >>> >> >> > as > >> >>> >> >> > flow files and will cause double I/O. Which eventually may > >> >>> >> >> > cause > >> >>> >> >> > slower > >> >>> >> >> > performance due to I/O bottleneck. > >> >>> >> >> > Is there any way to by pass writing flow files on disk or > >> >>> >> >> > directly > >> >>> >> >> > pass > >> >>> >> >> > those files to HDFS as it is ? > >> >>> >> >> > 1.b If the files on the spool directory are > >> >>> >> >> > compressed(zip/gzip), > >> >>> >> >> > can > >> >>> >> >> > we > >> >>> >> >> > store files on HDFS as uncompressed ? > >> >>> >> >> > > >> >>> >> >> > Question for Scenario 2: > >> >>> >> >> > 2.a Can we use our existing java code for masking ? if yes > >> >>> >> >> > then > >> >>> >> >> > how ? > >> >>> >> >> > 2.b For this Scenario we also want to bypass storing flow > >> >>> >> >> > files > >> >>> >> >> > on > >> >>> >> >> > disk. > >> >>> >> >> > Can > >> >>> >> >> > we do it on the fly, masking and storing on HDFS ? > >> >>> >> >> > 2.c If the source files are compressed (zip/gzip), is there > >> >>> >> >> > any > >> >>> >> >> > issue > >> >>> >> >> > for > >> >>> >> >> > masking here ? > >> >>> >> >> > > >> >>> >> >> > > >> >>> >> >> > In fact, I tried above using flume+flume interceptors. > >> >>> >> >> > Everything > >> >>> >> >> > working > >> >>> >> >> > fine with smaller files. But when source files greater that > >> >>> >> >> > 50MB > >> >>> >> >> > flume > >> >>> >> >> > chocks :(. > >> >>> >> >> > So, I am exploring options in NiFi. Hope I will get some > >> >>> >> >> > guideline > >> >>> >> >> > from > >> >>> >> >> > you > >> >>> >> >> > guys. > >> >>> >> >> > > >> >>> >> >> > > >> >>> >> >> > Thanks in advance. > >> >>> >> >> > -Obaid > >> >>> >> > > >> >>> >> > > >> > > >> > > > > > >