Obaid, I can't say for sure how much this would improve performance, but you might want to wrap the OutputStream with BufferedOutputStream or BufferedWriter. Would be curious to here if that helps.
A similar scenario from the standard processors is ReplaceText, here is one example where it uses the StreamCallback: https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L337 -Bryan On Tue, Jan 12, 2016 at 8:38 PM, obaidul karim <obaidc...@gmail.com> wrote: > Hi Joe, > > Yes, I took consideration of existinh RAID and HW settings. We have 10G > NIC for all hadoop intra-connectivity and the server in question is an edge > node of our hadoop cluster. > In production scenario we will use dedicated ETL servers having high > performance(>500MB/s) local disks. > > Sharing a good news, I have successfully mask & load to HDFS 110 GB data > using below flow: > > ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > > FetchFile (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads). > > * used 4 threads for masking and 1 for other because I found it is the > slowest component. > > However, It seems to be too slow. It was processing 2GB files in 6 > minutes. It may be because of my masking algorithm(although masking > algorithm is pretty simple FPE with some simple twist). > However I want to be sure that the way I have written custom processor is > the most efficient way. Please below code chunk and let me know whether it > is the fastest way to process flowfiles (csv source files) which needs > modifications on specific columns: > > * parseLine method contains logic for masking. > > flowfile = session.write(flowfile, new StreamCallback() { > @Override > public void process(InputStream in, OutputStream out) throws > IOException { > > BufferedReader reader = new BufferedReader(new > InputStreamReader(in)); > String line; > if(skipHeader == true && headerExists==true) { // to skip header, > do an additional line fetch before going to next step > if(reader.ready()) reader.readLine(); > } else if( skipHeader == false && headerExists == true) { // if > header is not skipped then no need to mask, just pass through > if(reader.ready()) > out.write((reader.readLine()+"\n").getBytes()); > } > > // decide about empty line earlier > while ((line = reader.readLine()) != null) { > if(line.trim().length() > 0 ) { > out.write( parseLine(line, seperator, quote, escape, > maskColumns).getBytes() ); > } > }; > out.flush(); > } > }); > > > > > Thanks in advance. > -Obaid > > > On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt <joe.w...@gmail.com> wrote: > >> Obaid, >> >> Really happy you're seeing the performance you need. That works out >> to about 110MB/s on average over that period. Any chance you have a >> 1GB NIC? If you really want to have fun with performance tuning you >> can use things like iostat and other commands to observe disk, >> network, cpu. Something else to consider too is the potential >> throughput gains of multiple RAID-1 containers rather than RAID-5 >> since NiFi can use both in parallel. Depends on your goals/workload >> so just an FYI. >> >> A good reference for how to build a processor which does altering of >> the data (transformation) is here [1]. It is a good idea to do a >> quick read through that document. Also, one of the great things you >> can do as well is look at existing processors. Some good examples >> relevant to transformation are [2], [3], and [4] which are quite >> simple stream transform types. Or take a look at [5] which is a more >> complicated example. You might also be excited to know that there is >> some really cool work done to bring various languages into NiFi which >> looks on track to be available in the upcoming 0.5.0 release which is >> NIFI-210 [6]. That will provide a really great option to quickly >> build transforms using languages like Groovy, JRuby, Javascript, >> Scala, Lua, Javascript, and Jython. >> >> [1] >> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content >> >> [2] >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java >> >> [3] >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java >> >> [4] >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java >> >> [5] >> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java >> >> [6] https://issues.apache.org/jira/browse/NIFI-210 >> >> Thanks >> Joe >> >> On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim <obaidc...@gmail.com> >> wrote: >> > Hi Joe, >> > >> > Just completed by test with 100GB data (on a local RAID 5 disk on a >> single >> > server). >> > >> > I was able to load 100GB data within 15 minutes(awesome!!) using below >> flow. >> > This throughput is enough to load 10TB data in a day with a single and >> > simple machine. >> > During the test, server disk I/O went up to 200MB/s. >> > >> > ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile (4 >> > threads) > PutHDFS (4 threads) >> > >> > My Next action is to incorporate my java code for column masking with a >> > custom processor. >> > I am now exploring on that. However, if you have any good reference on >> > custom processor(altering actual data) please let me know. >> > >> > Thanks, >> > Obaid >> > >> > >> > >> > On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim <obaidc...@gmail.com> >> wrote: >> >> >> >> Hi Joe, >> >> >> >> Yes, symlink is another option I was thinking when I was trying to use >> >> getfile. >> >> Thanks for your insights, I will update you on this mail chain when my >> >> entire workflow completes. So that thus could be an reference for >> other :). >> >> >> >> -Obaid >> >> >> >> On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote: >> >>> >> >>> Obaid, >> >>> >> >>> You make a great point. >> >>> >> >>> I agree we will ultimately need to do more to make that very valid >> >>> approach work easily. The downside is that puts the onus on NiFi to >> >>> keep track of a variety of potentially quite large state about the >> >>> directory. One way to avoid that expense is if NiFi can pull a copy >> >>> of then delete the source file. If you'd like to keep a copy around I >> >>> wonder if a good approach is to simply create a symlink to the >> >>> original file you want NiFi to pull but have the symlink in the NiFi >> >>> pickup directory. NiFi is then free to read and delete which means it >> >>> simply pulls whatever shows up in that directory and doesn't have to >> >>> keep state about filenames and checksums. >> >>> >> >>> I realize we still need to do what you're suggesting as well but >> >>> thought I'd run this by you. >> >>> >> >>> Joe >> >>> >> >>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim <obaidc...@gmail.com> >> >>> wrote: >> >>> > Hi Joe, >> >>> > >> >>> > Condider a scenerio, where we need to feed some older files and we >> are >> >>> > using >> >>> > "mv" to feed files to input directory( to reduce IO we may use >> "mv"). >> >>> > If we >> >>> > use "mv", last modified date will not changed. And this is very >> common >> >>> > on a >> >>> > busy file collection system. >> >>> > >> >>> > However, I think I can still manage it by adding additional "touch" >> >>> > before >> >>> > moving fole in the target directory. >> >>> > >> >>> > So, my suggestion is to add file selection criteria as an >> configurable >> >>> > option in listfile process on workflow. Options could be last >> modified >> >>> > date(as current one) unique file names, checksum etc. >> >>> > >> >>> > Thanks again man. >> >>> > -Obaid >> >>> > >> >>> > >> >>> > On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> wrote: >> >>> >> >> >>> >> Hello Obaid, >> >>> >> >> >>> >> The default behavior of the ListFile processor is to keep track of >> the >> >>> >> last modified time of the files it lists. When you changed the >> name >> >>> >> of the file that doesn't change the last modified time as tracked >> by >> >>> >> the OS but when you altered content it does. Simply 'touch' on the >> >>> >> file would do it too. >> >>> >> >> >>> >> I believe we could observe the last modified time of the directory >> in >> >>> >> which the file lives to detect something like a rename. However, >> we'd >> >>> >> not know which file was renamed just that something was changed. >> So >> >>> >> it require keeping some potentially problematic state to >> deconflict or >> >>> >> requiring the user to have a duplicate detection process >> afterwards. >> >>> >> >> >>> >> So with that in mind is the current behavior sufficient for your >> case? >> >>> >> >> >>> >> Thanks >> >>> >> Joe >> >>> >> >> >>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim <obaidc...@gmail.com >> > >> >>> >> wrote: >> >>> >> > Hi Joe, >> >>> >> > >> >>> >> > I am now exploring your solution. >> >>> >> > Starting with below flow: >> >>> >> > >> >>> >> > ListFIle > FetchFile > CompressContent > PutFile. >> >>> >> > >> >>> >> > Seems all fine. Except some confusion with how ListFile >> identifies >> >>> >> > new >> >>> >> > files. >> >>> >> > In order to test, I renamed a already processed file and put in >> in >> >>> >> > input >> >>> >> > folder and found that the file is not processing. >> >>> >> > Then I randomly changed the content of the file and it was >> >>> >> > immediately >> >>> >> > processed. >> >>> >> > >> >>> >> > My question is what is the new file selection criteria for >> >>> >> > "ListFile" ? >> >>> >> > Can >> >>> >> > I change it only to file name ? >> >>> >> > >> >>> >> > Thanks in advance. >> >>> >> > >> >>> >> > -Obaid >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> > >> >>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt <joe.w...@gmail.com> >> >>> >> > wrote: >> >>> >> >> >> >>> >> >> Hello Obaid, >> >>> >> >> >> >>> >> >> At 6 TB/day and average size of 2-3GB per dataset you're >> looking at >> >>> >> >> a >> >>> >> >> sustained rate of 70+MB/s and a pretty low transaction rate. So >> >>> >> >> well >> >>> >> >> within a good range to work with on a single system. >> >>> >> >> >> >>> >> >> 'I's there any way to by pass writing flow files on disk or >> >>> >> >> directly >> >>> >> >> pass those files to HDFS as it is ?" >> >>> >> >> >> >>> >> >> There is no way to bypass NiFi taking a copy of that data by >> >>> >> >> design. >> >>> >> >> NiFi is helping you formulate a graph of dataflow requirements >> from >> >>> >> >> a >> >>> >> >> given source(s) through given processing steps and ultimate >> driving >> >>> >> >> data into given destination systems. As a result it takes on >> the >> >>> >> >> challenge of handling transactionality of each interaction and >> the >> >>> >> >> buffering and backpressure to deal with the realities of >> different >> >>> >> >> production/consumption patterns. >> >>> >> >> >> >>> >> >> "If the files on the spool directory are compressed(zip/gzip), >> can >> >>> >> >> we >> >>> >> >> store files on HDFS as uncompressed ?" >> >>> >> >> >> >>> >> >> Certainly. Both of those formats (zip/gzip) are supported in >> >>> >> >> NiFi >> >>> >> >> out of the box. You simply run the data through the proper >> process >> >>> >> >> prior to the PutHDFS process to unpack (zip) or decompress >> (gzip) >> >>> >> >> as >> >>> >> >> needed. >> >>> >> >> >> >>> >> >> "2.a Can we use our existing java code for masking ? if yes then >> >>> >> >> how ? >> >>> >> >> 2.b For this Scenario we also want to bypass storing flow files >> on >> >>> >> >> disk. Can we do it on the fly, masking and storing on HDFS ? >> >>> >> >> 2.c If the source files are compressed (zip/gzip), is there any >> >>> >> >> issue >> >>> >> >> for masking here ?" >> >>> >> >> >> >>> >> >> You would build a custom NiFi processor that leverages your >> >>> >> >> existing >> >>> >> >> code. If your code is able to operate on an InputStream and >> writes >> >>> >> >> to >> >>> >> >> an OutputStream then it is very likely you'll be able to handle >> >>> >> >> arbitrarily large objects with zero negative impact to the JVM >> Heap >> >>> >> >> as >> >>> >> >> well. This is thanks to the fact that the data is present in >> >>> >> >> NiFi's >> >>> >> >> repository with copy-on-write/pass-by-reference semantics and >> that >> >>> >> >> the >> >>> >> >> API is exposing those streams to your code in a transactional >> >>> >> >> manner. >> >>> >> >> >> >>> >> >> If you want the process of writing to HDFS to also do >> >>> >> >> decompression >> >>> >> >> and masking in one pass you'll need to extend/alter the PutHDFS >> >>> >> >> process to do that. It is probably best to implement the flow >> >>> >> >> using >> >>> >> >> cohesive processors (grab files, decompress files, mask files, >> >>> >> >> write >> >>> >> >> to hdfs). Given how the repository construct in NiFi works and >> >>> >> >> given >> >>> >> >> how caching in Linux works it is very possible you'll be quite >> >>> >> >> surprised by the throughput you'll see. Even then you can >> optimize >> >>> >> >> once you're sure you need to. The other thing to keep in mind >> here >> >>> >> >> is >> >>> >> >> that often a flow that starts out as specific as this turns >> into a >> >>> >> >> great place to tap the stream of data to feed some new system or >> >>> >> >> new >> >>> >> >> algorithm with a different format or protocol. At that moment >> the >> >>> >> >> benefits become even more obvious. >> >>> >> >> >> >>> >> >> Regarding the Flume processes in NiFi and their memory usage. >> NiFi >> >>> >> >> offers a nice hosting mechanism for the Flume processes and >> brings >> >>> >> >> some of the benefits of NiFi's UI, provenance, repository >> concept. >> >>> >> >> However, we're still largely limited to the design assumptions >> one >> >>> >> >> gets when building a Flume process and that can be quite memory >> >>> >> >> limiting. We see what we have today as a great way to help >> people >> >>> >> >> transition their existing Flume flows into NiFi by leveraging >> their >> >>> >> >> existing code but would recommend working to phase the use of >> those >> >>> >> >> out in time so that you can take full benefit of what NiFi >> brings >> >>> >> >> over >> >>> >> >> Flume. >> >>> >> >> >> >>> >> >> Thanks >> >>> >> >> Joe >> >>> >> >> >> >>> >> >> >> >>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim < >> obaidc...@gmail.com> >> >>> >> >> wrote: >> >>> >> >> > Hi, >> >>> >> >> > >> >>> >> >> > I am new in Nifi and exploring it as open source ETL tool. >> >>> >> >> > >> >>> >> >> > As per my understanding, flow files are stored on local disk >> and >> >>> >> >> > it >> >>> >> >> > contains >> >>> >> >> > actual data. >> >>> >> >> > If above is true, lets consider a below scenario: >> >>> >> >> > >> >>> >> >> > Scenario 1: >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files >> >>> >> >> > coming >> >>> >> >> > from >> >>> >> >> > external sources >> >>> >> >> > - I want to push those files to HDFS as it is without any >> changes >> >>> >> >> > >> >>> >> >> > Scenario 2: >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of files >> >>> >> >> > coming >> >>> >> >> > from >> >>> >> >> > external sources >> >>> >> >> > - I want to mask some of the sensitive columns >> >>> >> >> > - Then send one copy to HDFS and another copy to Kafka >> >>> >> >> > >> >>> >> >> > Question for Scenario 1: >> >>> >> >> > 1.a In that case those 5-6TB data will be again written on >> local >> >>> >> >> > disk >> >>> >> >> > as >> >>> >> >> > flow files and will cause double I/O. Which eventually may >> cause >> >>> >> >> > slower >> >>> >> >> > performance due to I/O bottleneck. >> >>> >> >> > Is there any way to by pass writing flow files on disk or >> >>> >> >> > directly >> >>> >> >> > pass >> >>> >> >> > those files to HDFS as it is ? >> >>> >> >> > 1.b If the files on the spool directory are >> compressed(zip/gzip), >> >>> >> >> > can >> >>> >> >> > we >> >>> >> >> > store files on HDFS as uncompressed ? >> >>> >> >> > >> >>> >> >> > Question for Scenario 2: >> >>> >> >> > 2.a Can we use our existing java code for masking ? if yes >> then >> >>> >> >> > how ? >> >>> >> >> > 2.b For this Scenario we also want to bypass storing flow >> files >> >>> >> >> > on >> >>> >> >> > disk. >> >>> >> >> > Can >> >>> >> >> > we do it on the fly, masking and storing on HDFS ? >> >>> >> >> > 2.c If the source files are compressed (zip/gzip), is there >> any >> >>> >> >> > issue >> >>> >> >> > for >> >>> >> >> > masking here ? >> >>> >> >> > >> >>> >> >> > >> >>> >> >> > In fact, I tried above using flume+flume interceptors. >> Everything >> >>> >> >> > working >> >>> >> >> > fine with smaller files. But when source files greater that >> 50MB >> >>> >> >> > flume >> >>> >> >> > chocks :(. >> >>> >> >> > So, I am exploring options in NiFi. Hope I will get some >> >>> >> >> > guideline >> >>> >> >> > from >> >>> >> >> > you >> >>> >> >> > guys. >> >>> >> >> > >> >>> >> >> > >> >>> >> >> > Thanks in advance. >> >>> >> >> > -Obaid >> >>> >> > >> >>> >> > >> > >> > >> > >