Joe, Last time it was below: java.arg.2=-Xms512m java.arg.3=-Xmx512m
Now I made as below: java.arg.2=-Xms5120m java.arg.3=-Xmx10240m latest jstate & iostate output are attached. To me it is still slow, no significant improvements. -Obaid On Thu, Jan 14, 2016 at 12:41 PM, Joe Witt <joe.w...@gmail.com> wrote: > Obaid, > > Great so this is helpful info. Iostat output shows both CPU and disk > are generally bored and ready for more work. Looking at the gc output > though suggests trouble. We see there are 32 samples at 1 second > spread each and in that time spent more than 6 seconds of it doing > garbage collection including 5 full collections. That is usually a > sign of inefficient heap usage and/or simply an undersized heap. What > size do you have your heap settings at in the conf/bootstrap.conf > file? > > Thanks > Joe > > On Wed, Jan 13, 2016 at 11:32 PM, obaidul karim <obaidc...@gmail.com> > wrote: > > Hi Joe, > > > > Please find attached jstat & iostat output. > > > > So far it seems to me that it is CPU bound. However, your eyes are better > > tan mine :). > > > > -Obaid > > > > On Thu, Jan 14, 2016 at 11:51 AM, Joe Witt <joe.w...@gmail.com> wrote: > >> > >> Hello > >> > >> Let's narrow in on potential issues. So while this process is running > >> and appears sluggish in nature please run the following on the command > >> line > >> > >> 'jps' > >> > >> This command will tell you the process id of NiFi. You'll want the > >> pid associated with the Java process other than what is called 'jps' > >> presuming there aren't other things running than NiFi at the time. > >> > >> Lets say the result is a pid of '12345' > >> > >> Then run this command > >> > >> 'jstat -gcutil 12345 1000' > >> > >> This will generate garbage collection information every one second > >> until you decide to stop it with cntl-c. So let that run for a while > >> say 30 seconds or so then hit cntl-c. Can you please paste that > >> output in response. That will show us how the general health of GC > >> is. > >> > >> Another really important/powerful set of output can be gleaned by > >> running 'iostat' which gives you statistics about input/output to > >> things like the underlying storage system. That is part of the > >> 'sysstat' package in case you need to install that. But then you can > >> run > >> > >> ''iostat xmh 1" > >> > >> Or something even as simple as 'iostat 1'. Your specific command > >> string may vary. Please let that run for say 10-20 seconds and paste > >> those results as well. That will give a sense of io utilization while > >> the operation is running. > >> > >> Between these two outputs (Garbage Collection/IO) we should have a > >> pretty good idea of where to focus the effort to find why it is slow. > >> > >> Thanks > >> Joe > >> > >> > >> On Wed, Jan 13, 2016 at 9:23 PM, obaidul karim <obaidc...@gmail.com> > >> wrote: > >> > Hi Joe & Others, > >> > > >> > Thanks for all of your suggestions. > >> > > >> > Now I am using below code: > >> > 1. Buffered reader (I tried to use NLKBufferedReader, but it requires > >> > too > >> > many libs & Nifi failed to start. I was lost.) > >> > 2. Buffered writer > >> > 3. Using appending line end instead to concat new line > >> > > >> > Still no performance gain. Am I doing something wrong, anything else I > >> > can > >> > change here. > >> > > >> > flowfile = session.write(flowfile, new StreamCallback() { > >> > @Override > >> > public void process(InputStream in, OutputStream out) throws > IOException > >> > { > >> > try (BufferedReader reader = new BufferedReader(new > >> > InputStreamReader(in, charset), maxBufferSize); > >> > BufferedWriter writer = new BufferedWriter(new > >> > OutputStreamWriter(out, charset));) { > >> > > >> > if(skipHeader == true && headerExists==true) { // to skip header, do > an > >> > additional line fetch before going to next step > >> > if(reader.ready()) reader.readLine(); > >> > } else if( skipHeader == false && headerExists == true) { // if header > >> > is > >> > not skipped then no need to mask, just pass through > >> > if(reader.ready()) { > >> > writer.write(reader.readLine()); > >> > writer.write(lineEndingBuilder.toString()); > >> > } > >> > } > >> > // decide about empty line earlier > >> > String line; > >> > while ((line = reader.readLine()) != null) { > >> > writer.write(parseLine(line, seperator, quote, escape, maskColumns)); > >> > writer.write(lineEndingBuilder.toString()); > >> > }; > >> > writer.flush(); > >> > } > >> > } > >> > > >> > }); > >> > > >> > > >> > -Obaid > >> > > >> > On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt <joe.w...@gmail.com> wrote: > >> >> > >> >> Hello > >> >> > >> >> So the performance went from what sounded pretty good to what sounds > >> >> pretty problematic. The rate now sounds like it is around 5MB/s > which > >> >> is indeed quite poor. Building on what Bryan said there does appear > >> >> to be some good opportunities to improve the performance. The link > he > >> >> provided just expanded to cover the full range to look at is here > [1]. > >> >> > >> >> Couple key points to note: > >> >> 1) Use of a buffered line oriented reader than preserves the new > lines > >> >> 2) write to a buffered writer that accepts strings and understands > >> >> which charset you intend to write out > >> >> 3) avoid strong concat with newline > >> >> > >> >> Also keep in mind you how large any single line could be because if > >> >> they can be quite large you may need to consider the GC pressure that > >> >> can be caused. But let's take a look at how things are after these > >> >> easier steps first. > >> >> > >> >> [1] > >> >> > >> >> > https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L334-L361 > >> >> > >> >> Thanks > >> >> Joe > >> >> > >> >> On Tue, Jan 12, 2016 at 10:35 PM, Juan Sequeiros < > helloj...@gmail.com> > >> >> wrote: > >> >> > Obaid, > >> >> > > >> >> > Since you mention that you will have dedicated ETL servers and > assume > >> >> > they > >> >> > will also have a decent amount of ram on them, then I would not shy > >> >> > away > >> >> > from increasing your threads. > >> >> > > >> >> > Also in your staging directory if you do not need to keep > originals, > >> >> > then > >> >> > might consider GetFile and on that one use one thread. > >> >> > > >> >> > Hi Joe, > >> >> > > >> >> > Yes, I took consideration of existinh RAID and HW settings. We have > >> >> > 10G > >> >> > NIC > >> >> > for all hadoop intra-connectivity and the server in question is an > >> >> > edge > >> >> > node > >> >> > of our hadoop cluster. > >> >> > In production scenario we will use dedicated ETL servers having > high > >> >> > performance(>500MB/s) local disks. > >> >> > > >> >> > Sharing a good news, I have successfully mask & load to HDFS 110 GB > >> >> > data > >> >> > using below flow: > >> >> > > >> >> > ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) > > >> >> > FetchFile > >> >> > (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads). > >> >> > > >> >> > * used 4 threads for masking and 1 for other because I found it is > >> >> > the > >> >> > slowest component. > >> >> > > >> >> > However, It seems to be too slow. It was processing 2GB files in 6 > >> >> > minutes. > >> >> > It may be because of my masking algorithm(although masking > algorithm > >> >> > is > >> >> > pretty simple FPE with some simple twist). > >> >> > However I want to be sure that the way I have written custom > >> >> > processor > >> >> > is > >> >> > the most efficient way. Please below code chunk and let me know > >> >> > whether > >> >> > it > >> >> > is the fastest way to process flowfiles (csv source files) which > >> >> > needs > >> >> > modifications on specific columns: > >> >> > > >> >> > * parseLine method contains logic for masking. > >> >> > > >> >> > flowfile = session.write(flowfile, new StreamCallback() { > >> >> > @Override > >> >> > public void process(InputStream in, OutputStream out) > >> >> > throws > >> >> > IOException { > >> >> > > >> >> > BufferedReader reader = new BufferedReader(new > >> >> > InputStreamReader(in)); > >> >> > String line; > >> >> > if(skipHeader == true && headerExists==true) { // to skip > >> >> > header, do > >> >> > an additional line fetch before going to next step > >> >> > if(reader.ready()) reader.readLine(); > >> >> > } else if( skipHeader == false && headerExists == true) { > // > >> >> > if > >> >> > header is not skipped then no need to mask, just pass through > >> >> > if(reader.ready()) > >> >> > out.write((reader.readLine()+"\n").getBytes()); > >> >> > } > >> >> > > >> >> > // decide about empty line earlier > >> >> > while ((line = reader.readLine()) != null) { > >> >> > if(line.trim().length() > 0 ) { > >> >> > out.write( parseLine(line, seperator, quote, escape, > >> >> > maskColumns).getBytes() ); > >> >> > } > >> >> > }; > >> >> > out.flush(); > >> >> > } > >> >> > }); > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > Thanks in advance. > >> >> > -Obaid > >> >> > > >> >> > > >> >> > On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt <joe.w...@gmail.com> > wrote: > >> >> >> > >> >> >> Obaid, > >> >> >> > >> >> >> Really happy you're seeing the performance you need. That works > out > >> >> >> to about 110MB/s on average over that period. Any chance you > have a > >> >> >> 1GB NIC? If you really want to have fun with performance tuning > you > >> >> >> can use things like iostat and other commands to observe disk, > >> >> >> network, cpu. Something else to consider too is the potential > >> >> >> throughput gains of multiple RAID-1 containers rather than RAID-5 > >> >> >> since NiFi can use both in parallel. Depends on your > goals/workload > >> >> >> so just an FYI. > >> >> >> > >> >> >> A good reference for how to build a processor which does altering > of > >> >> >> the data (transformation) is here [1]. It is a good idea to do a > >> >> >> quick read through that document. Also, one of the great things > you > >> >> >> can do as well is look at existing processors. Some good examples > >> >> >> relevant to transformation are [2], [3], and [4] which are quite > >> >> >> simple stream transform types. Or take a look at [5] which is a > more > >> >> >> complicated example. You might also be excited to know that there > >> >> >> is > >> >> >> some really cool work done to bring various languages into NiFi > >> >> >> which > >> >> >> looks on track to be available in the upcoming 0.5.0 release which > >> >> >> is > >> >> >> NIFI-210 [6]. That will provide a really great option to quickly > >> >> >> build transforms using languages like Groovy, JRuby, Javascript, > >> >> >> Scala, Lua, Javascript, and Jython. > >> >> >> > >> >> >> [1] > >> >> >> > >> >> >> > >> >> >> > https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content > >> >> >> > >> >> >> [2] > >> >> >> > >> >> >> > >> >> >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java > >> >> >> > >> >> >> [3] > >> >> >> > >> >> >> > >> >> >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java > >> >> >> > >> >> >> [4] > >> >> >> > >> >> >> > >> >> >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java > >> >> >> > >> >> >> [5] > >> >> >> > >> >> >> > >> >> >> > https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java > >> >> >> > >> >> >> [6] https://issues.apache.org/jira/browse/NIFI-210 > >> >> >> > >> >> >> Thanks > >> >> >> Joe > >> >> >> > >> >> >> On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim < > obaidc...@gmail.com> > >> >> >> wrote: > >> >> >> > Hi Joe, > >> >> >> > > >> >> >> > Just completed by test with 100GB data (on a local RAID 5 disk > on > >> >> >> > a > >> >> >> > single > >> >> >> > server). > >> >> >> > > >> >> >> > I was able to load 100GB data within 15 minutes(awesome!!) using > >> >> >> > below > >> >> >> > flow. > >> >> >> > This throughput is enough to load 10TB data in a day with a > single > >> >> >> > and > >> >> >> > simple machine. > >> >> >> > During the test, server disk I/O went up to 200MB/s. > >> >> >> > > >> >> >> > ExecuteProcess(touch and mv to input dir) > ListFile > > >> >> >> > FetchFile > >> >> >> > (4 > >> >> >> > threads) > PutHDFS (4 threads) > >> >> >> > > >> >> >> > My Next action is to incorporate my java code for column masking > >> >> >> > with > >> >> >> > a > >> >> >> > custom processor. > >> >> >> > I am now exploring on that. However, if you have any good > >> >> >> > reference > >> >> >> > on > >> >> >> > custom processor(altering actual data) please let me know. > >> >> >> > > >> >> >> > Thanks, > >> >> >> > Obaid > >> >> >> > > >> >> >> > > >> >> >> > > >> >> >> > On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim > >> >> >> > <obaidc...@gmail.com> > >> >> >> > wrote: > >> >> >> >> > >> >> >> >> Hi Joe, > >> >> >> >> > >> >> >> >> Yes, symlink is another option I was thinking when I was trying > >> >> >> >> to > >> >> >> >> use > >> >> >> >> getfile. > >> >> >> >> Thanks for your insights, I will update you on this mail chain > >> >> >> >> when > >> >> >> >> my > >> >> >> >> entire workflow completes. So that thus could be an reference > for > >> >> >> >> other > >> >> >> >> :). > >> >> >> >> > >> >> >> >> -Obaid > >> >> >> >> > >> >> >> >> On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> > wrote: > >> >> >> >>> > >> >> >> >>> Obaid, > >> >> >> >>> > >> >> >> >>> You make a great point. > >> >> >> >>> > >> >> >> >>> I agree we will ultimately need to do more to make that very > >> >> >> >>> valid > >> >> >> >>> approach work easily. The downside is that puts the onus on > >> >> >> >>> NiFi > >> >> >> >>> to > >> >> >> >>> keep track of a variety of potentially quite large state about > >> >> >> >>> the > >> >> >> >>> directory. One way to avoid that expense is if NiFi can pull > a > >> >> >> >>> copy > >> >> >> >>> of then delete the source file. If you'd like to keep a copy > >> >> >> >>> around I > >> >> >> >>> wonder if a good approach is to simply create a symlink to the > >> >> >> >>> original file you want NiFi to pull but have the symlink in > the > >> >> >> >>> NiFi > >> >> >> >>> pickup directory. NiFi is then free to read and delete which > >> >> >> >>> means > >> >> >> >>> it > >> >> >> >>> simply pulls whatever shows up in that directory and doesn't > >> >> >> >>> have > >> >> >> >>> to > >> >> >> >>> keep state about filenames and checksums. > >> >> >> >>> > >> >> >> >>> I realize we still need to do what you're suggesting as well > but > >> >> >> >>> thought I'd run this by you. > >> >> >> >>> > >> >> >> >>> Joe > >> >> >> >>> > >> >> >> >>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim > >> >> >> >>> <obaidc...@gmail.com> > >> >> >> >>> wrote: > >> >> >> >>> > Hi Joe, > >> >> >> >>> > > >> >> >> >>> > Condider a scenerio, where we need to feed some older files > >> >> >> >>> > and > >> >> >> >>> > we > >> >> >> >>> > are > >> >> >> >>> > using > >> >> >> >>> > "mv" to feed files to input directory( to reduce IO we may > use > >> >> >> >>> > "mv"). > >> >> >> >>> > If we > >> >> >> >>> > use "mv", last modified date will not changed. And this is > >> >> >> >>> > very > >> >> >> >>> > common > >> >> >> >>> > on a > >> >> >> >>> > busy file collection system. > >> >> >> >>> > > >> >> >> >>> > However, I think I can still manage it by adding additional > >> >> >> >>> > "touch" > >> >> >> >>> > before > >> >> >> >>> > moving fole in the target directory. > >> >> >> >>> > > >> >> >> >>> > So, my suggestion is to add file selection criteria as an > >> >> >> >>> > configurable > >> >> >> >>> > option in listfile process on workflow. Options could be > last > >> >> >> >>> > modified > >> >> >> >>> > date(as current one) unique file names, checksum etc. > >> >> >> >>> > > >> >> >> >>> > Thanks again man. > >> >> >> >>> > -Obaid > >> >> >> >>> > > >> >> >> >>> > > >> >> >> >>> > On Monday, January 4, 2016, Joe Witt <joe.w...@gmail.com> > >> >> >> >>> > wrote: > >> >> >> >>> >> > >> >> >> >>> >> Hello Obaid, > >> >> >> >>> >> > >> >> >> >>> >> The default behavior of the ListFile processor is to keep > >> >> >> >>> >> track > >> >> >> >>> >> of > >> >> >> >>> >> the > >> >> >> >>> >> last modified time of the files it lists. When you changed > >> >> >> >>> >> the > >> >> >> >>> >> name > >> >> >> >>> >> of the file that doesn't change the last modified time as > >> >> >> >>> >> tracked > >> >> >> >>> >> by > >> >> >> >>> >> the OS but when you altered content it does. Simply > 'touch' > >> >> >> >>> >> on > >> >> >> >>> >> the > >> >> >> >>> >> file would do it too. > >> >> >> >>> >> > >> >> >> >>> >> I believe we could observe the last modified time of the > >> >> >> >>> >> directory > >> >> >> >>> >> in > >> >> >> >>> >> which the file lives to detect something like a rename. > >> >> >> >>> >> However, > >> >> >> >>> >> we'd > >> >> >> >>> >> not know which file was renamed just that something was > >> >> >> >>> >> changed. > >> >> >> >>> >> So > >> >> >> >>> >> it require keeping some potentially problematic state to > >> >> >> >>> >> deconflict > >> >> >> >>> >> or > >> >> >> >>> >> requiring the user to have a duplicate detection process > >> >> >> >>> >> afterwards. > >> >> >> >>> >> > >> >> >> >>> >> So with that in mind is the current behavior sufficient for > >> >> >> >>> >> your > >> >> >> >>> >> case? > >> >> >> >>> >> > >> >> >> >>> >> Thanks > >> >> >> >>> >> Joe > >> >> >> >>> >> > >> >> >> >>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim > >> >> >> >>> >> <obaidc...@gmail.com> > >> >> >> >>> >> wrote: > >> >> >> >>> >> > Hi Joe, > >> >> >> >>> >> > > >> >> >> >>> >> > I am now exploring your solution. > >> >> >> >>> >> > Starting with below flow: > >> >> >> >>> >> > > >> >> >> >>> >> > ListFIle > FetchFile > CompressContent > PutFile. > >> >> >> >>> >> > > >> >> >> >>> >> > Seems all fine. Except some confusion with how ListFile > >> >> >> >>> >> > identifies > >> >> >> >>> >> > new > >> >> >> >>> >> > files. > >> >> >> >>> >> > In order to test, I renamed a already processed file and > >> >> >> >>> >> > put > >> >> >> >>> >> > in > >> >> >> >>> >> > in > >> >> >> >>> >> > input > >> >> >> >>> >> > folder and found that the file is not processing. > >> >> >> >>> >> > Then I randomly changed the content of the file and it > was > >> >> >> >>> >> > immediately > >> >> >> >>> >> > processed. > >> >> >> >>> >> > > >> >> >> >>> >> > My question is what is the new file selection criteria > for > >> >> >> >>> >> > "ListFile" ? > >> >> >> >>> >> > Can > >> >> >> >>> >> > I change it only to file name ? > >> >> >> >>> >> > > >> >> >> >>> >> > Thanks in advance. > >> >> >> >>> >> > > >> >> >> >>> >> > -Obaid > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> >>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt > >> >> >> >>> >> > <joe.w...@gmail.com> > >> >> >> >>> >> > wrote: > >> >> >> >>> >> >> > >> >> >> >>> >> >> Hello Obaid, > >> >> >> >>> >> >> > >> >> >> >>> >> >> At 6 TB/day and average size of 2-3GB per dataset you're > >> >> >> >>> >> >> looking > >> >> >> >>> >> >> at > >> >> >> >>> >> >> a > >> >> >> >>> >> >> sustained rate of 70+MB/s and a pretty low transaction > >> >> >> >>> >> >> rate. > >> >> >> >>> >> >> So > >> >> >> >>> >> >> well > >> >> >> >>> >> >> within a good range to work with on a single system. > >> >> >> >>> >> >> > >> >> >> >>> >> >> 'I's there any way to by pass writing flow files on disk > >> >> >> >>> >> >> or > >> >> >> >>> >> >> directly > >> >> >> >>> >> >> pass those files to HDFS as it is ?" > >> >> >> >>> >> >> > >> >> >> >>> >> >> There is no way to bypass NiFi taking a copy of that > >> >> >> >>> >> >> data > >> >> >> >>> >> >> by > >> >> >> >>> >> >> design. > >> >> >> >>> >> >> NiFi is helping you formulate a graph of dataflow > >> >> >> >>> >> >> requirements > >> >> >> >>> >> >> from > >> >> >> >>> >> >> a > >> >> >> >>> >> >> given source(s) through given processing steps and > >> >> >> >>> >> >> ultimate > >> >> >> >>> >> >> driving > >> >> >> >>> >> >> data into given destination systems. As a result it > takes > >> >> >> >>> >> >> on > >> >> >> >>> >> >> the > >> >> >> >>> >> >> challenge of handling transactionality of each > interaction > >> >> >> >>> >> >> and > >> >> >> >>> >> >> the > >> >> >> >>> >> >> buffering and backpressure to deal with the realities of > >> >> >> >>> >> >> different > >> >> >> >>> >> >> production/consumption patterns. > >> >> >> >>> >> >> > >> >> >> >>> >> >> "If the files on the spool directory are > >> >> >> >>> >> >> compressed(zip/gzip), > >> >> >> >>> >> >> can > >> >> >> >>> >> >> we > >> >> >> >>> >> >> store files on HDFS as uncompressed ?" > >> >> >> >>> >> >> > >> >> >> >>> >> >> Certainly. Both of those formats (zip/gzip) are > >> >> >> >>> >> >> supported > >> >> >> >>> >> >> in > >> >> >> >>> >> >> NiFi > >> >> >> >>> >> >> out of the box. You simply run the data through the > >> >> >> >>> >> >> proper > >> >> >> >>> >> >> process > >> >> >> >>> >> >> prior to the PutHDFS process to unpack (zip) or > decompress > >> >> >> >>> >> >> (gzip) > >> >> >> >>> >> >> as > >> >> >> >>> >> >> needed. > >> >> >> >>> >> >> > >> >> >> >>> >> >> "2.a Can we use our existing java code for masking ? if > >> >> >> >>> >> >> yes > >> >> >> >>> >> >> then > >> >> >> >>> >> >> how ? > >> >> >> >>> >> >> 2.b For this Scenario we also want to bypass storing > flow > >> >> >> >>> >> >> files > >> >> >> >>> >> >> on > >> >> >> >>> >> >> disk. Can we do it on the fly, masking and storing on > HDFS > >> >> >> >>> >> >> ? > >> >> >> >>> >> >> 2.c If the source files are compressed (zip/gzip), is > >> >> >> >>> >> >> there > >> >> >> >>> >> >> any > >> >> >> >>> >> >> issue > >> >> >> >>> >> >> for masking here ?" > >> >> >> >>> >> >> > >> >> >> >>> >> >> You would build a custom NiFi processor that leverages > >> >> >> >>> >> >> your > >> >> >> >>> >> >> existing > >> >> >> >>> >> >> code. If your code is able to operate on an InputStream > >> >> >> >>> >> >> and > >> >> >> >>> >> >> writes > >> >> >> >>> >> >> to > >> >> >> >>> >> >> an OutputStream then it is very likely you'll be able to > >> >> >> >>> >> >> handle > >> >> >> >>> >> >> arbitrarily large objects with zero negative impact to > the > >> >> >> >>> >> >> JVM > >> >> >> >>> >> >> Heap > >> >> >> >>> >> >> as > >> >> >> >>> >> >> well. This is thanks to the fact that the data is > present > >> >> >> >>> >> >> in > >> >> >> >>> >> >> NiFi's > >> >> >> >>> >> >> repository with copy-on-write/pass-by-reference > semantics > >> >> >> >>> >> >> and > >> >> >> >>> >> >> that > >> >> >> >>> >> >> the > >> >> >> >>> >> >> API is exposing those streams to your code in a > >> >> >> >>> >> >> transactional > >> >> >> >>> >> >> manner. > >> >> >> >>> >> >> > >> >> >> >>> >> >> If you want the process of writing to HDFS to also do > >> >> >> >>> >> >> decompression > >> >> >> >>> >> >> and masking in one pass you'll need to extend/alter the > >> >> >> >>> >> >> PutHDFS > >> >> >> >>> >> >> process to do that. It is probably best to implement > the > >> >> >> >>> >> >> flow > >> >> >> >>> >> >> using > >> >> >> >>> >> >> cohesive processors (grab files, decompress files, mask > >> >> >> >>> >> >> files, > >> >> >> >>> >> >> write > >> >> >> >>> >> >> to hdfs). Given how the repository construct in NiFi > >> >> >> >>> >> >> works > >> >> >> >>> >> >> and > >> >> >> >>> >> >> given > >> >> >> >>> >> >> how caching in Linux works it is very possible you'll be > >> >> >> >>> >> >> quite > >> >> >> >>> >> >> surprised by the throughput you'll see. Even then you > can > >> >> >> >>> >> >> optimize > >> >> >> >>> >> >> once you're sure you need to. The other thing to keep > in > >> >> >> >>> >> >> mind > >> >> >> >>> >> >> here > >> >> >> >>> >> >> is > >> >> >> >>> >> >> that often a flow that starts out as specific as this > >> >> >> >>> >> >> turns > >> >> >> >>> >> >> into > >> >> >> >>> >> >> a > >> >> >> >>> >> >> great place to tap the stream of data to feed some new > >> >> >> >>> >> >> system > >> >> >> >>> >> >> or > >> >> >> >>> >> >> new > >> >> >> >>> >> >> algorithm with a different format or protocol. At that > >> >> >> >>> >> >> moment > >> >> >> >>> >> >> the > >> >> >> >>> >> >> benefits become even more obvious. > >> >> >> >>> >> >> > >> >> >> >>> >> >> Regarding the Flume processes in NiFi and their memory > >> >> >> >>> >> >> usage. > >> >> >> >>> >> >> NiFi > >> >> >> >>> >> >> offers a nice hosting mechanism for the Flume processes > >> >> >> >>> >> >> and > >> >> >> >>> >> >> brings > >> >> >> >>> >> >> some of the benefits of NiFi's UI, provenance, > repository > >> >> >> >>> >> >> concept. > >> >> >> >>> >> >> However, we're still largely limited to the design > >> >> >> >>> >> >> assumptions > >> >> >> >>> >> >> one > >> >> >> >>> >> >> gets when building a Flume process and that can be quite > >> >> >> >>> >> >> memory > >> >> >> >>> >> >> limiting. We see what we have today as a great way to > >> >> >> >>> >> >> help > >> >> >> >>> >> >> people > >> >> >> >>> >> >> transition their existing Flume flows into NiFi by > >> >> >> >>> >> >> leveraging > >> >> >> >>> >> >> their > >> >> >> >>> >> >> existing code but would recommend working to phase the > use > >> >> >> >>> >> >> of > >> >> >> >>> >> >> those > >> >> >> >>> >> >> out in time so that you can take full benefit of what > NiFi > >> >> >> >>> >> >> brings > >> >> >> >>> >> >> over > >> >> >> >>> >> >> Flume. > >> >> >> >>> >> >> > >> >> >> >>> >> >> Thanks > >> >> >> >>> >> >> Joe > >> >> >> >>> >> >> > >> >> >> >>> >> >> > >> >> >> >>> >> >> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim > >> >> >> >>> >> >> <obaidc...@gmail.com> > >> >> >> >>> >> >> wrote: > >> >> >> >>> >> >> > Hi, > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > I am new in Nifi and exploring it as open source ETL > >> >> >> >>> >> >> > tool. > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > As per my understanding, flow files are stored on > local > >> >> >> >>> >> >> > disk > >> >> >> >>> >> >> > and > >> >> >> >>> >> >> > it > >> >> >> >>> >> >> > contains > >> >> >> >>> >> >> > actual data. > >> >> >> >>> >> >> > If above is true, lets consider a below scenario: > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > Scenario 1: > >> >> >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of > >> >> >> >>> >> >> > files > >> >> >> >>> >> >> > coming > >> >> >> >>> >> >> > from > >> >> >> >>> >> >> > external sources > >> >> >> >>> >> >> > - I want to push those files to HDFS as it is without > >> >> >> >>> >> >> > any > >> >> >> >>> >> >> > changes > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > Scenario 2: > >> >> >> >>> >> >> > - In a spool directory we have terabytes(5-6TB/day) of > >> >> >> >>> >> >> > files > >> >> >> >>> >> >> > coming > >> >> >> >>> >> >> > from > >> >> >> >>> >> >> > external sources > >> >> >> >>> >> >> > - I want to mask some of the sensitive columns > >> >> >> >>> >> >> > - Then send one copy to HDFS and another copy to Kafka > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > Question for Scenario 1: > >> >> >> >>> >> >> > 1.a In that case those 5-6TB data will be again > written > >> >> >> >>> >> >> > on > >> >> >> >>> >> >> > local > >> >> >> >>> >> >> > disk > >> >> >> >>> >> >> > as > >> >> >> >>> >> >> > flow files and will cause double I/O. Which eventually > >> >> >> >>> >> >> > may > >> >> >> >>> >> >> > cause > >> >> >> >>> >> >> > slower > >> >> >> >>> >> >> > performance due to I/O bottleneck. > >> >> >> >>> >> >> > Is there any way to by pass writing flow files on disk > >> >> >> >>> >> >> > or > >> >> >> >>> >> >> > directly > >> >> >> >>> >> >> > pass > >> >> >> >>> >> >> > those files to HDFS as it is ? > >> >> >> >>> >> >> > 1.b If the files on the spool directory are > >> >> >> >>> >> >> > compressed(zip/gzip), > >> >> >> >>> >> >> > can > >> >> >> >>> >> >> > we > >> >> >> >>> >> >> > store files on HDFS as uncompressed ? > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > Question for Scenario 2: > >> >> >> >>> >> >> > 2.a Can we use our existing java code for masking ? if > >> >> >> >>> >> >> > yes > >> >> >> >>> >> >> > then > >> >> >> >>> >> >> > how ? > >> >> >> >>> >> >> > 2.b For this Scenario we also want to bypass storing > >> >> >> >>> >> >> > flow > >> >> >> >>> >> >> > files > >> >> >> >>> >> >> > on > >> >> >> >>> >> >> > disk. > >> >> >> >>> >> >> > Can > >> >> >> >>> >> >> > we do it on the fly, masking and storing on HDFS ? > >> >> >> >>> >> >> > 2.c If the source files are compressed (zip/gzip), is > >> >> >> >>> >> >> > there > >> >> >> >>> >> >> > any > >> >> >> >>> >> >> > issue > >> >> >> >>> >> >> > for > >> >> >> >>> >> >> > masking here ? > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > In fact, I tried above using flume+flume interceptors. > >> >> >> >>> >> >> > Everything > >> >> >> >>> >> >> > working > >> >> >> >>> >> >> > fine with smaller files. But when source files greater > >> >> >> >>> >> >> > that > >> >> >> >>> >> >> > 50MB > >> >> >> >>> >> >> > flume > >> >> >> >>> >> >> > chocks :(. > >> >> >> >>> >> >> > So, I am exploring options in NiFi. Hope I will get > some > >> >> >> >>> >> >> > guideline > >> >> >> >>> >> >> > from > >> >> >> >>> >> >> > you > >> >> >> >>> >> >> > guys. > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > > >> >> >> >>> >> >> > Thanks in advance. > >> >> >> >>> >> >> > -Obaid > >> >> >> >>> >> > > >> >> >> >>> >> > > >> >> >> > > >> >> >> > > >> >> > > >> >> > > >> > > >> > > > > > >
[etl@sthdmgt1-pvt nifi]$ jps 41616 RunJar 54865 FsShell 55476 FsShell 48118 RunJar 41497 RunJar 55706 Jps 4733 RunNiFi 4862 NiFi [etl@sthdmgt1-pvt nifi]$ [etl@sthdmgt1-pvt nifi]$ jstat -gcutil 4862 1000 S0 S1 E O M CCS YGC YGCT FGC FGCT GCT 28.12 0.00 73.42 38.76 95.44 90.23 6520 29.834 3 0.228 30.063 0.00 28.12 53.58 38.76 95.44 90.23 6523 29.849 3 0.228 30.077 43.75 0.00 29.34 38.76 95.44 90.23 6526 29.864 3 0.228 30.092 0.00 18.75 5.45 38.76 95.44 90.23 6529 29.879 3 0.228 30.108 0.00 43.75 85.98 38.77 95.44 90.23 6531 29.889 3 0.228 30.117 29.69 0.00 60.71 38.77 95.44 90.23 6534 29.904 3 0.228 30.132 0.00 18.75 35.86 38.78 95.44 90.23 6537 29.918 3 0.228 30.146 32.81 0.00 19.34 38.78 95.44 90.23 6540 29.933 3 0.228 30.161 0.00 37.50 5.96 38.78 95.44 90.23 6543 29.948 3 0.228 30.176 0.00 32.81 85.03 38.78 95.44 90.23 6545 29.959 3 0.228 30.187 37.50 0.00 62.76 38.79 95.44 90.23 6548 29.974 3 0.228 30.203 0.00 34.38 39.26 38.79 95.44 90.23 6551 29.990 3 0.228 30.218 46.88 0.00 19.40 38.79 95.44 90.23 6554 30.005 3 0.228 30.233 0.00 26.56 0.00 38.79 95.44 90.23 6557 30.019 3 0.228 30.247 0.00 10.94 68.94 38.80 95.44 90.23 6559 30.029 3 0.228 30.257 31.25 0.00 3.96 38.80 95.44 90.23 6560 30.034 3 0.228 30.262 0.00 29.69 89.64 38.80 95.44 90.23 6561 30.039 3 0.228 30.268 81.25 0.00 1.00 38.80 95.44 90.23 6562 30.045 3 0.228 30.273 0.00 34.38 55.85 38.80 95.44 90.23 6563 30.050 3 0.228 30.279
[etl@sthdmgt1-pvt nifi]$ iostat xmh 1 Linux 2.6.32-358.el6.x86_64 (sthdmgt1-pvt.aiu.axiata) 01/14/2016 _x86_64_ (24 CPU) avg-cpu: %user %nice %system %iowait %steal %idle 6.52 0.00 0.81 0.04 0.00 92.62 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 22.08 0.00 3.02 12.76 0.00 62.13 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 24.09 0.00 4.66 7.26 0.00 63.99 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 24.71 0.00 3.54 4.46 0.00 67.30 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 20.46 0.00 1.98 1.22 0.00 76.34 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 20.15 0.00 2.06 1.01 0.00 76.78 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 20.63 0.00 2.23 1.05 0.00 76.09 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 19.53 0.00 1.05 1.01 0.00 78.42 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 19.10 0.00 1.17 0.17 0.00 79.56 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 19.71 0.00 1.43 0.21 0.00 78.66 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 18.96 0.00 1.34 0.59 0.00 79.11 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 20.60 0.00 1.72 1.34 0.00 76.34 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn avg-cpu: %user %nice %system %iowait %steal %idle 20.07 0.00 1.21 1.09 0.00 77.63 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn ^C [etl@sthdmgt1-pvt nifi]$