Re: Data Ingestion forLarge Source Files and Masking

2016-01-22 Thread obaidul karim
Hi Joe,

I have created a JIRA  NIFI-1432
 as a new feature request
(Efficient CSV processor) with some recommendation and sharing my own code.


-Obaid

On Thu, Jan 14, 2016 at 2:14 PM, obaidul karim  wrote:

> Joe,
> I am doing some optimizations on my csv processing.
> Let clear them out then I will share the final version.
>
> -Obaid
>
>
> On Thursday, January 14, 2016, Joe Witt  wrote:
>
>> Quick observation for now off latest data:
>> - GC looks pretty good though it is surprising there were any full GCs
>> during that short test
>> - cpu has low utilization
>> - disk has low utilization
>>
>> Can you share your sample input data, processor code, flow as a
>> template?  Attaching to a JIRA for example could be a good way.  We
>> can use this as a good example of how someone can
>> troubleshoot/optimize.
>>
>> Thanks
>> Joe
>>
>> On Thu, Jan 14, 2016 at 1:00 AM, obaidul karim 
>> wrote:
>> > Joe,
>> >
>> > Last time it was below:
>> > java.arg.2=-Xms512m
>> > java.arg.3=-Xmx512m
>> >
>> > Now I made as below:
>> > java.arg.2=-Xms5120m
>> > java.arg.3=-Xmx10240m
>> >
>> > latest jstate & iostate output are attached.
>> > To me it is still slow, no significant improvements.
>> >
>> > -Obaid
>> >
>> > On Thu, Jan 14, 2016 at 12:41 PM, Joe Witt  wrote:
>> >>
>> >> Obaid,
>> >>
>> >> Great so this is helpful info.  Iostat output shows both CPU and disk
>> >> are generally bored and ready for more work.  Looking at the gc output
>> >> though suggests trouble.  We see there are 32 samples at 1 second
>> >> spread each and in that time spent more than 6 seconds of it doing
>> >> garbage collection including 5 full collections.  That is usually a
>> >> sign of inefficient heap usage and/or simply an undersized heap.  What
>> >> size do you have your heap settings at in the conf/bootstrap.conf
>> >> file?
>> >>
>> >> Thanks
>> >> Joe
>> >>
>> >> On Wed, Jan 13, 2016 at 11:32 PM, obaidul karim 
>> >> wrote:
>> >> > Hi Joe,
>> >> >
>> >> > Please find attached jstat & iostat output.
>> >> >
>> >> > So far it seems to me that it is CPU bound. However, your eyes are
>> >> > better
>> >> > tan mine :).
>> >> >
>> >> > -Obaid
>> >> >
>> >> > On Thu, Jan 14, 2016 at 11:51 AM, Joe Witt 
>> wrote:
>> >> >>
>> >> >> Hello
>> >> >>
>> >> >> Let's narrow in on potential issues.  So while this process is
>> running
>> >> >> and appears sluggish in nature please run the following on the
>> command
>> >> >> line
>> >> >>
>> >> >> 'jps'
>> >> >>
>> >> >> This command will tell you the process id of NiFi.  You'll want the
>> >> >> pid associated with the Java process other than what is called 'jps'
>> >> >> presuming there aren't other things running than NiFi at the time.
>> >> >>
>> >> >> Lets say the result is a pid of '12345'
>> >> >>
>> >> >> Then run this command
>> >> >>
>> >> >> 'jstat -gcutil 12345 1000'
>> >> >>
>> >> >> This will generate garbage collection information every one second
>> >> >> until you decide to stop it with cntl-c.  So let that run for a
>> while
>> >> >> say 30 seconds or so then hit cntl-c.  Can you please paste that
>> >> >> output in response.  That will show us how the general health of GC
>> >> >> is.
>> >> >>
>> >> >> Another really important/powerful set of output can be gleaned by
>> >> >> running 'iostat' which gives you statistics about input/output to
>> >> >> things like the underlying storage system.  That is part of the
>> >> >> 'sysstat' package in case you need to install that.  But then you
>> can
>> >> >> run
>> >> >>
>> >> >> ''iostat xmh 1"
>> >> >>
>> >> >> Or something even as simple as 'iostat 1'.  Your specific command
>> >> >> string may vary.  Please let that run for say 10-20 seconds and
>> paste
>> >> >> those results as well.  That will give a sense of io utilization
>> while
>> >> >> the operation is running.
>> >> >>
>> >> >> Between these two outputs (Garbage Collection/IO) we should have a
>> >> >> pretty good idea of where to focus the effort to find why it is
>> slow.
>> >> >>
>> >> >> Thanks
>> >> >> Joe
>> >> >>
>> >> >>
>> >> >> On Wed, Jan 13, 2016 at 9:23 PM, obaidul karim > >
>> >> >> wrote:
>> >> >> > Hi Joe & Others,
>> >> >> >
>> >> >> > Thanks for all of your suggestions.
>> >> >> >
>> >> >> > Now I am using below code:
>> >> >> > 1. Buffered reader (I tried to use NLKBufferedReader, but it
>> requires
>> >> >> > too
>> >> >> > many libs & Nifi failed to start. I was lost.)
>> >> >> > 2. Buffered writer
>> >> >> > 3. Using appending line end instead to concat new line
>> >> >> >
>> >> >> > Still no performance gain. Am I doing something wrong, anything
>> else
>> >> >> > I
>> >> >> > can
>> >> >> > change here.
>> >> >> >
>> >> >> > flowfile = session.write(flowfile, new StreamCallback() {
>> >> >> > @Override
>> >> >> > public void process(InputStream in, 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-13 Thread Joe Witt
Quick observation for now off latest data:
- GC looks pretty good though it is surprising there were any full GCs
during that short test
- cpu has low utilization
- disk has low utilization

Can you share your sample input data, processor code, flow as a
template?  Attaching to a JIRA for example could be a good way.  We
can use this as a good example of how someone can
troubleshoot/optimize.

Thanks
Joe

On Thu, Jan 14, 2016 at 1:00 AM, obaidul karim  wrote:
> Joe,
>
> Last time it was below:
> java.arg.2=-Xms512m
> java.arg.3=-Xmx512m
>
> Now I made as below:
> java.arg.2=-Xms5120m
> java.arg.3=-Xmx10240m
>
> latest jstate & iostate output are attached.
> To me it is still slow, no significant improvements.
>
> -Obaid
>
> On Thu, Jan 14, 2016 at 12:41 PM, Joe Witt  wrote:
>>
>> Obaid,
>>
>> Great so this is helpful info.  Iostat output shows both CPU and disk
>> are generally bored and ready for more work.  Looking at the gc output
>> though suggests trouble.  We see there are 32 samples at 1 second
>> spread each and in that time spent more than 6 seconds of it doing
>> garbage collection including 5 full collections.  That is usually a
>> sign of inefficient heap usage and/or simply an undersized heap.  What
>> size do you have your heap settings at in the conf/bootstrap.conf
>> file?
>>
>> Thanks
>> Joe
>>
>> On Wed, Jan 13, 2016 at 11:32 PM, obaidul karim 
>> wrote:
>> > Hi Joe,
>> >
>> > Please find attached jstat & iostat output.
>> >
>> > So far it seems to me that it is CPU bound. However, your eyes are
>> > better
>> > tan mine :).
>> >
>> > -Obaid
>> >
>> > On Thu, Jan 14, 2016 at 11:51 AM, Joe Witt  wrote:
>> >>
>> >> Hello
>> >>
>> >> Let's narrow in on potential issues.  So while this process is running
>> >> and appears sluggish in nature please run the following on the command
>> >> line
>> >>
>> >> 'jps'
>> >>
>> >> This command will tell you the process id of NiFi.  You'll want the
>> >> pid associated with the Java process other than what is called 'jps'
>> >> presuming there aren't other things running than NiFi at the time.
>> >>
>> >> Lets say the result is a pid of '12345'
>> >>
>> >> Then run this command
>> >>
>> >> 'jstat -gcutil 12345 1000'
>> >>
>> >> This will generate garbage collection information every one second
>> >> until you decide to stop it with cntl-c.  So let that run for a while
>> >> say 30 seconds or so then hit cntl-c.  Can you please paste that
>> >> output in response.  That will show us how the general health of GC
>> >> is.
>> >>
>> >> Another really important/powerful set of output can be gleaned by
>> >> running 'iostat' which gives you statistics about input/output to
>> >> things like the underlying storage system.  That is part of the
>> >> 'sysstat' package in case you need to install that.  But then you can
>> >> run
>> >>
>> >> ''iostat xmh 1"
>> >>
>> >> Or something even as simple as 'iostat 1'.  Your specific command
>> >> string may vary.  Please let that run for say 10-20 seconds and paste
>> >> those results as well.  That will give a sense of io utilization while
>> >> the operation is running.
>> >>
>> >> Between these two outputs (Garbage Collection/IO) we should have a
>> >> pretty good idea of where to focus the effort to find why it is slow.
>> >>
>> >> Thanks
>> >> Joe
>> >>
>> >>
>> >> On Wed, Jan 13, 2016 at 9:23 PM, obaidul karim 
>> >> wrote:
>> >> > Hi Joe & Others,
>> >> >
>> >> > Thanks for all of your suggestions.
>> >> >
>> >> > Now I am using below code:
>> >> > 1. Buffered reader (I tried to use NLKBufferedReader, but it requires
>> >> > too
>> >> > many libs & Nifi failed to start. I was lost.)
>> >> > 2. Buffered writer
>> >> > 3. Using appending line end instead to concat new line
>> >> >
>> >> > Still no performance gain. Am I doing something wrong, anything else
>> >> > I
>> >> > can
>> >> > change here.
>> >> >
>> >> > flowfile = session.write(flowfile, new StreamCallback() {
>> >> > @Override
>> >> > public void process(InputStream in, OutputStream out) throws
>> >> > IOException
>> >> > {
>> >> > try (BufferedReader reader = new BufferedReader(new
>> >> > InputStreamReader(in, charset), maxBufferSize);
>> >> > BufferedWriter writer = new BufferedWriter(new
>> >> > OutputStreamWriter(out, charset));) {
>> >> >
>> >> > if(skipHeader == true && headerExists==true) { // to skip header, do
>> >> > an
>> >> > additional line fetch before going to next step
>> >> > if(reader.ready())   reader.readLine();
>> >> > } else if( skipHeader == false && headerExists == true) { // if
>> >> > header
>> >> > is
>> >> > not skipped then no need to mask, just pass through
>> >> > if(reader.ready())  {
>> >> > writer.write(reader.readLine());
>> >> > writer.write(lineEndingBuilder.toString());
>> >> > }
>> >> > }
>> >> > // decide about empty line earlier
>> >> > String line;
>> >> > while ((line = reader.readLine()) != null) 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-13 Thread obaidul karim
Joe,

Last time it was below:
java.arg.2=-Xms512m
java.arg.3=-Xmx512m

Now I made as below:
java.arg.2=-Xms5120m
java.arg.3=-Xmx10240m

latest jstate & iostate output are attached.
To me it is still slow, no significant improvements.

-Obaid

On Thu, Jan 14, 2016 at 12:41 PM, Joe Witt  wrote:

> Obaid,
>
> Great so this is helpful info.  Iostat output shows both CPU and disk
> are generally bored and ready for more work.  Looking at the gc output
> though suggests trouble.  We see there are 32 samples at 1 second
> spread each and in that time spent more than 6 seconds of it doing
> garbage collection including 5 full collections.  That is usually a
> sign of inefficient heap usage and/or simply an undersized heap.  What
> size do you have your heap settings at in the conf/bootstrap.conf
> file?
>
> Thanks
> Joe
>
> On Wed, Jan 13, 2016 at 11:32 PM, obaidul karim 
> wrote:
> > Hi Joe,
> >
> > Please find attached jstat & iostat output.
> >
> > So far it seems to me that it is CPU bound. However, your eyes are better
> > tan mine :).
> >
> > -Obaid
> >
> > On Thu, Jan 14, 2016 at 11:51 AM, Joe Witt  wrote:
> >>
> >> Hello
> >>
> >> Let's narrow in on potential issues.  So while this process is running
> >> and appears sluggish in nature please run the following on the command
> >> line
> >>
> >> 'jps'
> >>
> >> This command will tell you the process id of NiFi.  You'll want the
> >> pid associated with the Java process other than what is called 'jps'
> >> presuming there aren't other things running than NiFi at the time.
> >>
> >> Lets say the result is a pid of '12345'
> >>
> >> Then run this command
> >>
> >> 'jstat -gcutil 12345 1000'
> >>
> >> This will generate garbage collection information every one second
> >> until you decide to stop it with cntl-c.  So let that run for a while
> >> say 30 seconds or so then hit cntl-c.  Can you please paste that
> >> output in response.  That will show us how the general health of GC
> >> is.
> >>
> >> Another really important/powerful set of output can be gleaned by
> >> running 'iostat' which gives you statistics about input/output to
> >> things like the underlying storage system.  That is part of the
> >> 'sysstat' package in case you need to install that.  But then you can
> >> run
> >>
> >> ''iostat xmh 1"
> >>
> >> Or something even as simple as 'iostat 1'.  Your specific command
> >> string may vary.  Please let that run for say 10-20 seconds and paste
> >> those results as well.  That will give a sense of io utilization while
> >> the operation is running.
> >>
> >> Between these two outputs (Garbage Collection/IO) we should have a
> >> pretty good idea of where to focus the effort to find why it is slow.
> >>
> >> Thanks
> >> Joe
> >>
> >>
> >> On Wed, Jan 13, 2016 at 9:23 PM, obaidul karim 
> >> wrote:
> >> > Hi Joe & Others,
> >> >
> >> > Thanks for all of your suggestions.
> >> >
> >> > Now I am using below code:
> >> > 1. Buffered reader (I tried to use NLKBufferedReader, but it requires
> >> > too
> >> > many libs & Nifi failed to start. I was lost.)
> >> > 2. Buffered writer
> >> > 3. Using appending line end instead to concat new line
> >> >
> >> > Still no performance gain. Am I doing something wrong, anything else I
> >> > can
> >> > change here.
> >> >
> >> > flowfile = session.write(flowfile, new StreamCallback() {
> >> > @Override
> >> > public void process(InputStream in, OutputStream out) throws
> IOException
> >> > {
> >> > try (BufferedReader reader = new BufferedReader(new
> >> > InputStreamReader(in, charset), maxBufferSize);
> >> > BufferedWriter writer = new BufferedWriter(new
> >> > OutputStreamWriter(out, charset));) {
> >> >
> >> > if(skipHeader == true && headerExists==true) { // to skip header, do
> an
> >> > additional line fetch before going to next step
> >> > if(reader.ready())   reader.readLine();
> >> > } else if( skipHeader == false && headerExists == true) { // if header
> >> > is
> >> > not skipped then no need to mask, just pass through
> >> > if(reader.ready())  {
> >> > writer.write(reader.readLine());
> >> > writer.write(lineEndingBuilder.toString());
> >> > }
> >> > }
> >> > // decide about empty line earlier
> >> > String line;
> >> > while ((line = reader.readLine()) != null) {
> >> > writer.write(parseLine(line, seperator, quote, escape, maskColumns));
> >> > writer.write(lineEndingBuilder.toString());
> >> > };
> >> > writer.flush();
> >> > }
> >> > }
> >> >
> >> > });
> >> >
> >> >
> >> > -Obaid
> >> >
> >> > On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt  wrote:
> >> >>
> >> >> Hello
> >> >>
> >> >> So the performance went from what sounded pretty good to what sounds
> >> >> pretty problematic.  The rate now sounds like it is around 5MB/s
> which
> >> >> is indeed quite poor.  Building on what Bryan said there does appear
> >> >> to be some good opportunities to improve the performance.  The 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-13 Thread obaidul karim
Hi Joe & Others,

Thanks for all of your suggestions.

Now I am using below code:
1. Buffered reader (I tried to use NLKBufferedReader, but it requires too
many libs & Nifi failed to start. I was lost.)
2. Buffered writer
3. Using appending line end instead to concat new line

Still no performance gain. Am I doing something wrong, anything else I can
change here.

flowfile = session.write(flowfile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
try (BufferedReader reader = new BufferedReader(new
InputStreamReader(in, charset), maxBufferSize);
BufferedWriter writer = new BufferedWriter(new
OutputStreamWriter(out, charset));) {

if(skipHeader == true && headerExists==true) { // to skip header, do an
additional line fetch before going to next step
if(reader.ready())   reader.readLine();
} else if( skipHeader == false && headerExists == true) { // if header is
not skipped then no need to mask, just pass through
if(reader.ready())  {
writer.write(reader.readLine());
writer.write(lineEndingBuilder.toString());
}
}
// decide about empty line earlier
String line;
while ((line = reader.readLine()) != null) {
writer.write(parseLine(line, seperator, quote, escape, maskColumns));
writer.write(lineEndingBuilder.toString());
};
writer.flush();
}
}

});


-Obaid

On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt  wrote:

> Hello
>
> So the performance went from what sounded pretty good to what sounds
> pretty problematic.  The rate now sounds like it is around 5MB/s which
> is indeed quite poor.  Building on what Bryan said there does appear
> to be some good opportunities to improve the performance.  The link he
> provided just expanded to cover the full range to look at is here [1].
>
> Couple key points to note:
> 1) Use of a buffered line oriented reader than preserves the new lines
> 2) write to a buffered writer that accepts strings and understands
> which charset you intend to write out
> 3) avoid strong concat with newline
>
> Also keep in mind you how large any single line could be because if
> they can be quite large you may need to consider the GC pressure that
> can be caused.  But let's take a look at how things are after these
> easier steps first.
>
> [1]
> https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L334-L361
>
> Thanks
> Joe
>
> On Tue, Jan 12, 2016 at 10:35 PM, Juan Sequeiros 
> wrote:
> > Obaid,
> >
> > Since you mention that you will have dedicated ETL servers and assume
> they
> > will also have a decent amount of ram on them, then I would not shy away
> > from increasing your threads.
> >
> > Also in your staging directory if you do not need to keep originals, then
> > might consider GetFile and on that one use one thread.
> >
> > Hi Joe,
> >
> > Yes, I took consideration of existinh RAID and HW settings. We have 10G
> NIC
> > for all hadoop intra-connectivity and the server in question is an edge
> node
> > of our hadoop cluster.
> > In production scenario we will use dedicated ETL servers having high
> > performance(>500MB/s) local disks.
> >
> > Sharing a good news, I have successfully mask & load to HDFS 110 GB data
> > using below flow:
> >
> > ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) >
> FetchFile
> > (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads).
> >
> > * used 4 threads for masking and 1 for other because I found it is the
> > slowest component.
> >
> > However, It seems to be too slow. It was processing 2GB files in  6
> minutes.
> > It may be because of my masking algorithm(although masking algorithm is
> > pretty simple FPE with some simple twist).
> > However I want to be sure that the way I have written custom processor is
> > the most efficient way. Please below code chunk and let me know whether
> it
> > is the fastest way to process flowfiles (csv source files) which needs
> > modifications on specific columns:
> >
> > * parseLine method contains logic for masking.
> >
> >flowfile = session.write(flowfile, new StreamCallback() {
> > @Override
> >public void process(InputStream in, OutputStream out) throws
> > IOException {
> >
> > BufferedReader reader = new BufferedReader(new
> > InputStreamReader(in));
> > String line;
> > if(skipHeader == true && headerExists==true) { // to skip
> header, do
> > an additional line fetch before going to next step
> > if(reader.ready())   reader.readLine();
> > } else if( skipHeader == false && headerExists == true) { // if
> > header is not skipped then no need to mask, just pass through
> > if(reader.ready())
>  out.write((reader.readLine()+"\n").getBytes());
> > }
> >
> > // decide about empty line earlier
> > while ((line = 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-13 Thread obaidul karim
Hi Joe,

Please find attached jstat & iostat output.

So far it seems to me that it is CPU bound. However, your eyes are better
tan mine :).

-Obaid

On Thu, Jan 14, 2016 at 11:51 AM, Joe Witt  wrote:

> Hello
>
> Let's narrow in on potential issues.  So while this process is running
> and appears sluggish in nature please run the following on the command
> line
>
> 'jps'
>
> This command will tell you the process id of NiFi.  You'll want the
> pid associated with the Java process other than what is called 'jps'
> presuming there aren't other things running than NiFi at the time.
>
> Lets say the result is a pid of '12345'
>
> Then run this command
>
> 'jstat -gcutil 12345 1000'
>
> This will generate garbage collection information every one second
> until you decide to stop it with cntl-c.  So let that run for a while
> say 30 seconds or so then hit cntl-c.  Can you please paste that
> output in response.  That will show us how the general health of GC
> is.
>
> Another really important/powerful set of output can be gleaned by
> running 'iostat' which gives you statistics about input/output to
> things like the underlying storage system.  That is part of the
> 'sysstat' package in case you need to install that.  But then you can
> run
>
> ''iostat xmh 1"
>
> Or something even as simple as 'iostat 1'.  Your specific command
> string may vary.  Please let that run for say 10-20 seconds and paste
> those results as well.  That will give a sense of io utilization while
> the operation is running.
>
> Between these two outputs (Garbage Collection/IO) we should have a
> pretty good idea of where to focus the effort to find why it is slow.
>
> Thanks
> Joe
>
>
> On Wed, Jan 13, 2016 at 9:23 PM, obaidul karim 
> wrote:
> > Hi Joe & Others,
> >
> > Thanks for all of your suggestions.
> >
> > Now I am using below code:
> > 1. Buffered reader (I tried to use NLKBufferedReader, but it requires too
> > many libs & Nifi failed to start. I was lost.)
> > 2. Buffered writer
> > 3. Using appending line end instead to concat new line
> >
> > Still no performance gain. Am I doing something wrong, anything else I
> can
> > change here.
> >
> > flowfile = session.write(flowfile, new StreamCallback() {
> > @Override
> > public void process(InputStream in, OutputStream out) throws IOException
> {
> > try (BufferedReader reader = new BufferedReader(new
> > InputStreamReader(in, charset), maxBufferSize);
> > BufferedWriter writer = new BufferedWriter(new
> > OutputStreamWriter(out, charset));) {
> >
> > if(skipHeader == true && headerExists==true) { // to skip header, do an
> > additional line fetch before going to next step
> > if(reader.ready())   reader.readLine();
> > } else if( skipHeader == false && headerExists == true) { // if header is
> > not skipped then no need to mask, just pass through
> > if(reader.ready())  {
> > writer.write(reader.readLine());
> > writer.write(lineEndingBuilder.toString());
> > }
> > }
> > // decide about empty line earlier
> > String line;
> > while ((line = reader.readLine()) != null) {
> > writer.write(parseLine(line, seperator, quote, escape, maskColumns));
> > writer.write(lineEndingBuilder.toString());
> > };
> > writer.flush();
> > }
> > }
> >
> > });
> >
> >
> > -Obaid
> >
> > On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt  wrote:
> >>
> >> Hello
> >>
> >> So the performance went from what sounded pretty good to what sounds
> >> pretty problematic.  The rate now sounds like it is around 5MB/s which
> >> is indeed quite poor.  Building on what Bryan said there does appear
> >> to be some good opportunities to improve the performance.  The link he
> >> provided just expanded to cover the full range to look at is here [1].
> >>
> >> Couple key points to note:
> >> 1) Use of a buffered line oriented reader than preserves the new lines
> >> 2) write to a buffered writer that accepts strings and understands
> >> which charset you intend to write out
> >> 3) avoid strong concat with newline
> >>
> >> Also keep in mind you how large any single line could be because if
> >> they can be quite large you may need to consider the GC pressure that
> >> can be caused.  But let's take a look at how things are after these
> >> easier steps first.
> >>
> >> [1]
> >>
> https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L334-L361
> >>
> >> Thanks
> >> Joe
> >>
> >> On Tue, Jan 12, 2016 at 10:35 PM, Juan Sequeiros 
> >> wrote:
> >> > Obaid,
> >> >
> >> > Since you mention that you will have dedicated ETL servers and assume
> >> > they
> >> > will also have a decent amount of ram on them, then I would not shy
> away
> >> > from increasing your threads.
> >> >
> >> > Also in your staging directory if you do not need to keep originals,
> >> > then
> >> > might 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-13 Thread Joe Witt
Obaid,

Great so this is helpful info.  Iostat output shows both CPU and disk
are generally bored and ready for more work.  Looking at the gc output
though suggests trouble.  We see there are 32 samples at 1 second
spread each and in that time spent more than 6 seconds of it doing
garbage collection including 5 full collections.  That is usually a
sign of inefficient heap usage and/or simply an undersized heap.  What
size do you have your heap settings at in the conf/bootstrap.conf
file?

Thanks
Joe

On Wed, Jan 13, 2016 at 11:32 PM, obaidul karim  wrote:
> Hi Joe,
>
> Please find attached jstat & iostat output.
>
> So far it seems to me that it is CPU bound. However, your eyes are better
> tan mine :).
>
> -Obaid
>
> On Thu, Jan 14, 2016 at 11:51 AM, Joe Witt  wrote:
>>
>> Hello
>>
>> Let's narrow in on potential issues.  So while this process is running
>> and appears sluggish in nature please run the following on the command
>> line
>>
>> 'jps'
>>
>> This command will tell you the process id of NiFi.  You'll want the
>> pid associated with the Java process other than what is called 'jps'
>> presuming there aren't other things running than NiFi at the time.
>>
>> Lets say the result is a pid of '12345'
>>
>> Then run this command
>>
>> 'jstat -gcutil 12345 1000'
>>
>> This will generate garbage collection information every one second
>> until you decide to stop it with cntl-c.  So let that run for a while
>> say 30 seconds or so then hit cntl-c.  Can you please paste that
>> output in response.  That will show us how the general health of GC
>> is.
>>
>> Another really important/powerful set of output can be gleaned by
>> running 'iostat' which gives you statistics about input/output to
>> things like the underlying storage system.  That is part of the
>> 'sysstat' package in case you need to install that.  But then you can
>> run
>>
>> ''iostat xmh 1"
>>
>> Or something even as simple as 'iostat 1'.  Your specific command
>> string may vary.  Please let that run for say 10-20 seconds and paste
>> those results as well.  That will give a sense of io utilization while
>> the operation is running.
>>
>> Between these two outputs (Garbage Collection/IO) we should have a
>> pretty good idea of where to focus the effort to find why it is slow.
>>
>> Thanks
>> Joe
>>
>>
>> On Wed, Jan 13, 2016 at 9:23 PM, obaidul karim 
>> wrote:
>> > Hi Joe & Others,
>> >
>> > Thanks for all of your suggestions.
>> >
>> > Now I am using below code:
>> > 1. Buffered reader (I tried to use NLKBufferedReader, but it requires
>> > too
>> > many libs & Nifi failed to start. I was lost.)
>> > 2. Buffered writer
>> > 3. Using appending line end instead to concat new line
>> >
>> > Still no performance gain. Am I doing something wrong, anything else I
>> > can
>> > change here.
>> >
>> > flowfile = session.write(flowfile, new StreamCallback() {
>> > @Override
>> > public void process(InputStream in, OutputStream out) throws IOException
>> > {
>> > try (BufferedReader reader = new BufferedReader(new
>> > InputStreamReader(in, charset), maxBufferSize);
>> > BufferedWriter writer = new BufferedWriter(new
>> > OutputStreamWriter(out, charset));) {
>> >
>> > if(skipHeader == true && headerExists==true) { // to skip header, do an
>> > additional line fetch before going to next step
>> > if(reader.ready())   reader.readLine();
>> > } else if( skipHeader == false && headerExists == true) { // if header
>> > is
>> > not skipped then no need to mask, just pass through
>> > if(reader.ready())  {
>> > writer.write(reader.readLine());
>> > writer.write(lineEndingBuilder.toString());
>> > }
>> > }
>> > // decide about empty line earlier
>> > String line;
>> > while ((line = reader.readLine()) != null) {
>> > writer.write(parseLine(line, seperator, quote, escape, maskColumns));
>> > writer.write(lineEndingBuilder.toString());
>> > };
>> > writer.flush();
>> > }
>> > }
>> >
>> > });
>> >
>> >
>> > -Obaid
>> >
>> > On Wed, Jan 13, 2016 at 1:38 PM, Joe Witt  wrote:
>> >>
>> >> Hello
>> >>
>> >> So the performance went from what sounded pretty good to what sounds
>> >> pretty problematic.  The rate now sounds like it is around 5MB/s which
>> >> is indeed quite poor.  Building on what Bryan said there does appear
>> >> to be some good opportunities to improve the performance.  The link he
>> >> provided just expanded to cover the full range to look at is here [1].
>> >>
>> >> Couple key points to note:
>> >> 1) Use of a buffered line oriented reader than preserves the new lines
>> >> 2) write to a buffered writer that accepts strings and understands
>> >> which charset you intend to write out
>> >> 3) avoid strong concat with newline
>> >>
>> >> Also keep in mind you how large any single line could be because if
>> >> they can be quite large you may need to consider the GC pressure that
>> >> can be caused.  But let's take a look at how things are 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-12 Thread Bryan Bende
Obaid,

I can't say for sure how much this would improve performance, but you might
want to wrap the OutputStream with BufferedOutputStream or BufferedWriter.
Would be curious to here if that helps.

A similar scenario from the standard processors is ReplaceText, here is one
example where it uses the StreamCallback:
https://github.com/apache/nifi/blob/ee14d8f9dd0c3f18920d910fcddd6d79b8b9f9cf/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java#L337

-Bryan

On Tue, Jan 12, 2016 at 8:38 PM, obaidul karim  wrote:

> Hi Joe,
>
> Yes, I took consideration of existinh RAID and HW settings. We have 10G
> NIC for all hadoop intra-connectivity and the server in question is an edge
> node of our hadoop cluster.
> In production scenario we will use dedicated ETL servers having high
> performance(>500MB/s) local disks.
>
> Sharing a good news, I have successfully mask & load to HDFS 110 GB data
> using below flow:
>
> ExecuteProcess(touch and mv to input dir) > ListFile (1 thread) >
> FetchFile (1 thread) > maskColumn(4 threads) > PutHDFS (1 threads).
>
> * used 4 threads for masking and 1 for other because I found it is the
> slowest component.
>
> However, It seems to be too slow. It was processing 2GB files in  6
> minutes. It may be because of my masking algorithm(although masking
> algorithm is pretty simple FPE with some simple twist).
> However I want to be sure that the way I have written custom processor is
> the most efficient way. Please below code chunk and let me know whether it
> is the fastest way to process flowfiles (csv source files) which needs
> modifications on specific columns:
>
> * parseLine method contains logic for masking.
>
>flowfile = session.write(flowfile, new StreamCallback() {
> @Override
>public void process(InputStream in, OutputStream out) throws
> IOException {
>
> BufferedReader reader = new BufferedReader(new
> InputStreamReader(in));
> String line;
> if(skipHeader == true && headerExists==true) { // to skip header,
> do an additional line fetch before going to next step
> if(reader.ready())   reader.readLine();
> } else if( skipHeader == false && headerExists == true) { // if
> header is not skipped then no need to mask, just pass through
> if(reader.ready())
> out.write((reader.readLine()+"\n").getBytes());
> }
>
> // decide about empty line earlier
> while ((line = reader.readLine()) != null) {
> if(line.trim().length() > 0 ) {
> out.write( parseLine(line, seperator, quote, escape,
> maskColumns).getBytes() );
> }
> };
> out.flush();
>}
>});
>
>
>
>
> Thanks in advance.
> -Obaid
>
>
> On Tue, Jan 5, 2016 at 12:36 PM, Joe Witt  wrote:
>
>> Obaid,
>>
>> Really happy you're seeing the performance you need.  That works out
>> to about 110MB/s on average over that period.  Any chance you have a
>> 1GB NIC?  If you really want to have fun with performance tuning you
>> can use things like iostat and other commands to observe disk,
>> network, cpu.  Something else to consider too is the potential
>> throughput gains of multiple RAID-1 containers rather than RAID-5
>> since NiFi can use both in parallel.  Depends on your goals/workload
>> so just an FYI.
>>
>> A good reference for how to build a processor which does altering of
>> the data (transformation) is here [1].  It is a good idea to do a
>> quick read through that document.  Also, one of the great things you
>> can do as well is look at existing processors.  Some good examples
>> relevant to transformation are [2], [3], and [4] which are quite
>> simple stream transform types. Or take a look at [5] which is a more
>> complicated example.  You might also be excited to know that there is
>> some really cool work done to bring various languages into NiFi which
>> looks on track to be available in the upcoming 0.5.0 release which is
>> NIFI-210 [6].  That will provide a really great option to quickly
>> build transforms using languages like Groovy, JRuby, Javascript,
>> Scala, Lua, Javascript, and Jython.
>>
>> [1]
>> https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content
>>
>> [2]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java
>>
>> [3]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java
>>
>> [4]
>> https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java
>>
>> [5]
>> 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-04 Thread Joe Witt
Obaid,

Really happy you're seeing the performance you need.  That works out
to about 110MB/s on average over that period.  Any chance you have a
1GB NIC?  If you really want to have fun with performance tuning you
can use things like iostat and other commands to observe disk,
network, cpu.  Something else to consider too is the potential
throughput gains of multiple RAID-1 containers rather than RAID-5
since NiFi can use both in parallel.  Depends on your goals/workload
so just an FYI.

A good reference for how to build a processor which does altering of
the data (transformation) is here [1].  It is a good idea to do a
quick read through that document.  Also, one of the great things you
can do as well is look at existing processors.  Some good examples
relevant to transformation are [2], [3], and [4] which are quite
simple stream transform types. Or take a look at [5] which is a more
complicated example.  You might also be excited to know that there is
some really cool work done to bring various languages into NiFi which
looks on track to be available in the upcoming 0.5.0 release which is
NIFI-210 [6].  That will provide a really great option to quickly
build transforms using languages like Groovy, JRuby, Javascript,
Scala, Lua, Javascript, and Jython.

[1] 
https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#enrich-modify-content

[2] 
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Base64EncodeContent.java

[3] 
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformXml.java

[4] 
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ModifyBytes.java

[5] 
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java

[6] https://issues.apache.org/jira/browse/NIFI-210

Thanks
Joe

On Mon, Jan 4, 2016 at 9:32 PM, obaidul karim  wrote:
> Hi Joe,
>
> Just completed by test with 100GB data (on a local RAID 5 disk on a single
> server).
>
> I was able to load 100GB data within 15 minutes(awesome!!) using below flow.
> This throughput is enough to load 10TB data in a day with a single and
> simple machine.
> During the test, server disk I/O went up to 200MB/s.
>
> ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile (4
> threads) > PutHDFS (4 threads)
>
> My Next action is to incorporate my java code for column masking with a
> custom processor.
> I am now exploring on that. However, if you have any good reference on
> custom processor(altering actual data) please let  me know.
>
> Thanks,
> Obaid
>
>
>
> On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim  wrote:
>>
>> Hi Joe,
>>
>> Yes, symlink is another option I was thinking when I was trying to use
>> getfile.
>> Thanks for your insights, I will update you on this mail chain when my
>> entire workflow completes. So that thus could be an reference for other :).
>>
>> -Obaid
>>
>> On Monday, January 4, 2016, Joe Witt  wrote:
>>>
>>> Obaid,
>>>
>>> You make a great point.
>>>
>>> I agree we will ultimately need to do more to make that very valid
>>> approach work easily.  The downside is that puts the onus on NiFi to
>>> keep track of a variety of potentially quite large state about the
>>> directory.  One way to avoid that expense is if NiFi can pull a copy
>>> of then delete the source file.  If you'd like to keep a copy around I
>>> wonder if a good approach is to simply create a symlink to the
>>> original file you want NiFi to pull but have the symlink in the NiFi
>>> pickup directory.  NiFi is then free to read and delete which means it
>>> simply pulls whatever shows up in that directory and doesn't have to
>>> keep state about filenames and checksums.
>>>
>>> I realize we still need to do what you're suggesting as well but
>>> thought I'd run this by you.
>>>
>>> Joe
>>>
>>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim 
>>> wrote:
>>> > Hi Joe,
>>> >
>>> > Condider a scenerio, where we need to feed some older files and we are
>>> > using
>>> > "mv" to feed files to input directory( to reduce IO we may use "mv").
>>> > If we
>>> > use "mv", last modified date will not changed. And this is very common
>>> > on a
>>> > busy file collection system.
>>> >
>>> > However, I think I can still manage it by adding additional "touch"
>>> > before
>>> > moving fole in the target directory.
>>> >
>>> > So, my suggestion is to add file selection criteria as an configurable
>>> > option in listfile process on workflow. Options could be last modified
>>> > date(as current one) unique file names, checksum etc.
>>> >
>>> > Thanks again 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-04 Thread obaidul karim
Hi Joe,

Just completed by test with 100GB data (on a local RAID 5 disk on a single
server).

I was able to load 100GB data within 15 minutes(awesome!!) using below
flow. This throughput is enough to load 10TB data in a day with a single
and simple machine.
During the test, server disk I/O went up to 200MB/s.

ExecuteProcess(touch and mv to input dir) > ListFile > FetchFile (4
threads) > PutHDFS (4 threads)

My Next action is to incorporate my java code for column masking with a
custom processor.
I am now exploring on that. However, if you have any good reference on
custom processor(altering actual data) please let  me know.

Thanks,
Obaid



On Mon, Jan 4, 2016 at 9:11 AM, obaidul karim  wrote:

> Hi Joe,
>
> Yes, symlink is another option I was thinking when I was trying to use
> getfile.
> Thanks for your insights, I will update you on this mail chain when my
> entire workflow completes. So that thus could be an reference for other :).
>
> -Obaid
>
> On Monday, January 4, 2016, Joe Witt  wrote:
>
>> Obaid,
>>
>> You make a great point.
>>
>> I agree we will ultimately need to do more to make that very valid
>> approach work easily.  The downside is that puts the onus on NiFi to
>> keep track of a variety of potentially quite large state about the
>> directory.  One way to avoid that expense is if NiFi can pull a copy
>> of then delete the source file.  If you'd like to keep a copy around I
>> wonder if a good approach is to simply create a symlink to the
>> original file you want NiFi to pull but have the symlink in the NiFi
>> pickup directory.  NiFi is then free to read and delete which means it
>> simply pulls whatever shows up in that directory and doesn't have to
>> keep state about filenames and checksums.
>>
>> I realize we still need to do what you're suggesting as well but
>> thought I'd run this by you.
>>
>> Joe
>>
>> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim 
>> wrote:
>> > Hi Joe,
>> >
>> > Condider a scenerio, where we need to feed some older files and we are
>> using
>> > "mv" to feed files to input directory( to reduce IO we may use "mv").
>> If we
>> > use "mv", last modified date will not changed. And this is very common
>> on a
>> > busy file collection system.
>> >
>> > However, I think I can still manage it by adding additional "touch"
>> before
>> > moving fole in the target directory.
>> >
>> > So, my suggestion is to add file selection criteria as an configurable
>> > option in listfile process on workflow. Options could be last modified
>> > date(as current one) unique file names, checksum etc.
>> >
>> > Thanks again man.
>> > -Obaid
>> >
>> >
>> > On Monday, January 4, 2016, Joe Witt  wrote:
>> >>
>> >> Hello Obaid,
>> >>
>> >> The default behavior of the ListFile processor is to keep track of the
>> >> last modified time of the files it lists.  When you changed the name
>> >> of the file that doesn't change the last modified time as tracked by
>> >> the OS but when you altered content it does.  Simply 'touch' on the
>> >> file would do it too.
>> >>
>> >> I believe we could observe the last modified time of the directory in
>> >> which the file lives to detect something like a rename.  However, we'd
>> >> not know which file was renamed just that something was changed.  So
>> >> it require keeping some potentially problematic state to deconflict or
>> >> requiring the user to have a duplicate detection process afterwards.
>> >>
>> >> So with that in mind is the current behavior sufficient for your case?
>> >>
>> >> Thanks
>> >> Joe
>> >>
>> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim 
>> wrote:
>> >> > Hi Joe,
>> >> >
>> >> > I am now exploring your solution.
>> >> > Starting with below flow:
>> >> >
>> >> > ListFIle > FetchFile > CompressContent > PutFile.
>> >> >
>> >> > Seems all fine. Except some confusion with how ListFile identifies
>> new
>> >> > files.
>> >> > In order to test, I renamed a already processed file and put in in
>> input
>> >> > folder and found that the file is not processing.
>> >> > Then I randomly changed the content of the file and it was
>> immediately
>> >> > processed.
>> >> >
>> >> > My question is what is the new file selection criteria for
>> "ListFile" ?
>> >> > Can
>> >> > I change it only to file name ?
>> >> >
>> >> > Thanks in advance.
>> >> >
>> >> > -Obaid
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt 
>> wrote:
>> >> >>
>> >> >> Hello Obaid,
>> >> >>
>> >> >> At 6 TB/day and average size of 2-3GB per dataset you're looking at
>> a
>> >> >> sustained rate of 70+MB/s and a pretty low transaction rate.  So
>> well
>> >> >> within a good range to work with on a single system.
>> >> >>
>> >> >> 'I's there any way to by pass writing flow files on disk or directly
>> >> >> pass those files to HDFS as it is ?"
>> >> >>
>> >> >>   There is no way to 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-03 Thread Joe Witt
Hello Obaid,

The default behavior of the ListFile processor is to keep track of the
last modified time of the files it lists.  When you changed the name
of the file that doesn't change the last modified time as tracked by
the OS but when you altered content it does.  Simply 'touch' on the
file would do it too.

I believe we could observe the last modified time of the directory in
which the file lives to detect something like a rename.  However, we'd
not know which file was renamed just that something was changed.  So
it require keeping some potentially problematic state to deconflict or
requiring the user to have a duplicate detection process afterwards.

So with that in mind is the current behavior sufficient for your case?

Thanks
Joe

On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim  wrote:
> Hi Joe,
>
> I am now exploring your solution.
> Starting with below flow:
>
> ListFIle > FetchFile > CompressContent > PutFile.
>
> Seems all fine. Except some confusion with how ListFile identifies new
> files.
> In order to test, I renamed a already processed file and put in in input
> folder and found that the file is not processing.
> Then I randomly changed the content of the file and it was immediately
> processed.
>
> My question is what is the new file selection criteria for "ListFile" ? Can
> I change it only to file name ?
>
> Thanks in advance.
>
> -Obaid
>
>
>
>
>
>
>
> On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt  wrote:
>>
>> Hello Obaid,
>>
>> At 6 TB/day and average size of 2-3GB per dataset you're looking at a
>> sustained rate of 70+MB/s and a pretty low transaction rate.  So well
>> within a good range to work with on a single system.
>>
>> 'I's there any way to by pass writing flow files on disk or directly
>> pass those files to HDFS as it is ?"
>>
>>   There is no way to bypass NiFi taking a copy of that data by design.
>> NiFi is helping you formulate a graph of dataflow requirements from a
>> given source(s) through given processing steps and ultimate driving
>> data into given destination systems.  As a result it takes on the
>> challenge of handling transactionality of each interaction and the
>> buffering and backpressure to deal with the realities of different
>> production/consumption patterns.
>>
>> "If the files on the spool directory are compressed(zip/gzip), can we
>> store files on HDFS as uncompressed ?"
>>
>>   Certainly.  Both of those formats (zip/gzip) are supported in NiFi
>> out of the box.  You simply run the data through the proper process
>> prior to the PutHDFS process to unpack (zip) or decompress (gzip) as
>> needed.
>>
>> "2.a Can we use our existing java code for masking ? if yes then how ?
>> 2.b For this Scenario we also want to bypass storing flow files on
>> disk. Can we do it on the fly, masking and storing on HDFS ?
>> 2.c If the source files are compressed (zip/gzip), is there any issue
>> for masking here ?"
>>
>>   You would build a custom NiFi processor that leverages your existing
>> code.  If your code is able to operate on an InputStream and writes to
>> an OutputStream then it is very likely you'll be able to handle
>> arbitrarily large objects with zero negative impact to the JVM Heap as
>> well.  This is thanks to the fact that the data is present in NiFi's
>> repository with copy-on-write/pass-by-reference semantics and that the
>> API is exposing those streams to your code in a transactional manner.
>>
>>   If you want the process of writing to HDFS to also do decompression
>> and masking in one pass you'll need to extend/alter the PutHDFS
>> process to do that.  It is probably best to implement the flow using
>> cohesive processors (grab files, decompress files, mask files, write
>> to hdfs).  Given how the repository construct in NiFi works and given
>> how caching in Linux works it is very possible you'll be quite
>> surprised by the throughput you'll see.  Even then you can optimize
>> once you're sure you need to.  The other thing to keep in mind here is
>> that often a flow that starts out as specific as this turns into a
>> great place to tap the stream of data to feed some new system or new
>> algorithm with a different format or protocol.  At that moment the
>> benefits become even more obvious.
>>
>> Regarding the Flume processes in NiFi and their memory usage.  NiFi
>> offers a nice hosting mechanism for the Flume processes and brings
>> some of the benefits of NiFi's UI, provenance, repository concept.
>> However, we're still largely limited to the design assumptions one
>> gets when building a Flume process and that can be quite memory
>> limiting.  We see what we have today as a great way to help people
>> transition their existing Flume flows into NiFi by leveraging their
>> existing code but would recommend working to phase the use of those
>> out in time so that you can take full benefit of what NiFi brings over
>> Flume.
>>
>> Thanks
>> Joe
>>
>>
>> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-03 Thread Joe Witt
Obaid,

You make a great point.

I agree we will ultimately need to do more to make that very valid
approach work easily.  The downside is that puts the onus on NiFi to
keep track of a variety of potentially quite large state about the
directory.  One way to avoid that expense is if NiFi can pull a copy
of then delete the source file.  If you'd like to keep a copy around I
wonder if a good approach is to simply create a symlink to the
original file you want NiFi to pull but have the symlink in the NiFi
pickup directory.  NiFi is then free to read and delete which means it
simply pulls whatever shows up in that directory and doesn't have to
keep state about filenames and checksums.

I realize we still need to do what you're suggesting as well but
thought I'd run this by you.

Joe

On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim  wrote:
> Hi Joe,
>
> Condider a scenerio, where we need to feed some older files and we are using
> "mv" to feed files to input directory( to reduce IO we may use "mv"). If we
> use "mv", last modified date will not changed. And this is very common on a
> busy file collection system.
>
> However, I think I can still manage it by adding additional "touch" before
> moving fole in the target directory.
>
> So, my suggestion is to add file selection criteria as an configurable
> option in listfile process on workflow. Options could be last modified
> date(as current one) unique file names, checksum etc.
>
> Thanks again man.
> -Obaid
>
>
> On Monday, January 4, 2016, Joe Witt  wrote:
>>
>> Hello Obaid,
>>
>> The default behavior of the ListFile processor is to keep track of the
>> last modified time of the files it lists.  When you changed the name
>> of the file that doesn't change the last modified time as tracked by
>> the OS but when you altered content it does.  Simply 'touch' on the
>> file would do it too.
>>
>> I believe we could observe the last modified time of the directory in
>> which the file lives to detect something like a rename.  However, we'd
>> not know which file was renamed just that something was changed.  So
>> it require keeping some potentially problematic state to deconflict or
>> requiring the user to have a duplicate detection process afterwards.
>>
>> So with that in mind is the current behavior sufficient for your case?
>>
>> Thanks
>> Joe
>>
>> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim  wrote:
>> > Hi Joe,
>> >
>> > I am now exploring your solution.
>> > Starting with below flow:
>> >
>> > ListFIle > FetchFile > CompressContent > PutFile.
>> >
>> > Seems all fine. Except some confusion with how ListFile identifies new
>> > files.
>> > In order to test, I renamed a already processed file and put in in input
>> > folder and found that the file is not processing.
>> > Then I randomly changed the content of the file and it was immediately
>> > processed.
>> >
>> > My question is what is the new file selection criteria for "ListFile" ?
>> > Can
>> > I change it only to file name ?
>> >
>> > Thanks in advance.
>> >
>> > -Obaid
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt  wrote:
>> >>
>> >> Hello Obaid,
>> >>
>> >> At 6 TB/day and average size of 2-3GB per dataset you're looking at a
>> >> sustained rate of 70+MB/s and a pretty low transaction rate.  So well
>> >> within a good range to work with on a single system.
>> >>
>> >> 'I's there any way to by pass writing flow files on disk or directly
>> >> pass those files to HDFS as it is ?"
>> >>
>> >>   There is no way to bypass NiFi taking a copy of that data by design.
>> >> NiFi is helping you formulate a graph of dataflow requirements from a
>> >> given source(s) through given processing steps and ultimate driving
>> >> data into given destination systems.  As a result it takes on the
>> >> challenge of handling transactionality of each interaction and the
>> >> buffering and backpressure to deal with the realities of different
>> >> production/consumption patterns.
>> >>
>> >> "If the files on the spool directory are compressed(zip/gzip), can we
>> >> store files on HDFS as uncompressed ?"
>> >>
>> >>   Certainly.  Both of those formats (zip/gzip) are supported in NiFi
>> >> out of the box.  You simply run the data through the proper process
>> >> prior to the PutHDFS process to unpack (zip) or decompress (gzip) as
>> >> needed.
>> >>
>> >> "2.a Can we use our existing java code for masking ? if yes then how ?
>> >> 2.b For this Scenario we also want to bypass storing flow files on
>> >> disk. Can we do it on the fly, masking and storing on HDFS ?
>> >> 2.c If the source files are compressed (zip/gzip), is there any issue
>> >> for masking here ?"
>> >>
>> >>   You would build a custom NiFi processor that leverages your existing
>> >> code.  If your code is able to operate on an InputStream and writes to
>> >> an OutputStream then it is very likely you'll be able to handle
>> >> 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-03 Thread obaidul karim
Hi Joe,

Condider a scenerio, where we need to feed some older files and we are
using "mv" to feed files to input directory( to reduce IO we may use "mv").
If we use "mv", last modified date will not changed. And this is very
common on a busy file collection system.

However, I think I can still manage it by adding additional "touch" before
moving fole in the target directory.

So, my suggestion is to add file selection criteria as an configurable
option in listfile process on workflow. Options could be last modified
date(as current one) unique file names, checksum etc.

Thanks again man.
-Obaid


On Monday, January 4, 2016, Joe Witt  wrote:

> Hello Obaid,
>
> The default behavior of the ListFile processor is to keep track of the
> last modified time of the files it lists.  When you changed the name
> of the file that doesn't change the last modified time as tracked by
> the OS but when you altered content it does.  Simply 'touch' on the
> file would do it too.
>
> I believe we could observe the last modified time of the directory in
> which the file lives to detect something like a rename.  However, we'd
> not know which file was renamed just that something was changed.  So
> it require keeping some potentially problematic state to deconflict or
> requiring the user to have a duplicate detection process afterwards.
>
> So with that in mind is the current behavior sufficient for your case?
>
> Thanks
> Joe
>
> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim  > wrote:
> > Hi Joe,
> >
> > I am now exploring your solution.
> > Starting with below flow:
> >
> > ListFIle > FetchFile > CompressContent > PutFile.
> >
> > Seems all fine. Except some confusion with how ListFile identifies new
> > files.
> > In order to test, I renamed a already processed file and put in in input
> > folder and found that the file is not processing.
> > Then I randomly changed the content of the file and it was immediately
> > processed.
> >
> > My question is what is the new file selection criteria for "ListFile" ?
> Can
> > I change it only to file name ?
> >
> > Thanks in advance.
> >
> > -Obaid
> >
> >
> >
> >
> >
> >
> >
> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt  > wrote:
> >>
> >> Hello Obaid,
> >>
> >> At 6 TB/day and average size of 2-3GB per dataset you're looking at a
> >> sustained rate of 70+MB/s and a pretty low transaction rate.  So well
> >> within a good range to work with on a single system.
> >>
> >> 'I's there any way to by pass writing flow files on disk or directly
> >> pass those files to HDFS as it is ?"
> >>
> >>   There is no way to bypass NiFi taking a copy of that data by design.
> >> NiFi is helping you formulate a graph of dataflow requirements from a
> >> given source(s) through given processing steps and ultimate driving
> >> data into given destination systems.  As a result it takes on the
> >> challenge of handling transactionality of each interaction and the
> >> buffering and backpressure to deal with the realities of different
> >> production/consumption patterns.
> >>
> >> "If the files on the spool directory are compressed(zip/gzip), can we
> >> store files on HDFS as uncompressed ?"
> >>
> >>   Certainly.  Both of those formats (zip/gzip) are supported in NiFi
> >> out of the box.  You simply run the data through the proper process
> >> prior to the PutHDFS process to unpack (zip) or decompress (gzip) as
> >> needed.
> >>
> >> "2.a Can we use our existing java code for masking ? if yes then how ?
> >> 2.b For this Scenario we also want to bypass storing flow files on
> >> disk. Can we do it on the fly, masking and storing on HDFS ?
> >> 2.c If the source files are compressed (zip/gzip), is there any issue
> >> for masking here ?"
> >>
> >>   You would build a custom NiFi processor that leverages your existing
> >> code.  If your code is able to operate on an InputStream and writes to
> >> an OutputStream then it is very likely you'll be able to handle
> >> arbitrarily large objects with zero negative impact to the JVM Heap as
> >> well.  This is thanks to the fact that the data is present in NiFi's
> >> repository with copy-on-write/pass-by-reference semantics and that the
> >> API is exposing those streams to your code in a transactional manner.
> >>
> >>   If you want the process of writing to HDFS to also do decompression
> >> and masking in one pass you'll need to extend/alter the PutHDFS
> >> process to do that.  It is probably best to implement the flow using
> >> cohesive processors (grab files, decompress files, mask files, write
> >> to hdfs).  Given how the repository construct in NiFi works and given
> >> how caching in Linux works it is very possible you'll be quite
> >> surprised by the throughput you'll see.  Even then you can optimize
> >> once you're sure you need to.  The other thing to keep in mind here is
> >> that often a flow that starts out as specific as this turns into a
> >> 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-03 Thread obaidul karim
Hi Joe,

Yes, symlink is another option I was thinking when I was trying to use
getfile.
Thanks for your insights, I will update you on this mail chain when my
entire workflow completes. So that thus could be an reference for other :).

-Obaid

On Monday, January 4, 2016, Joe Witt  wrote:

> Obaid,
>
> You make a great point.
>
> I agree we will ultimately need to do more to make that very valid
> approach work easily.  The downside is that puts the onus on NiFi to
> keep track of a variety of potentially quite large state about the
> directory.  One way to avoid that expense is if NiFi can pull a copy
> of then delete the source file.  If you'd like to keep a copy around I
> wonder if a good approach is to simply create a symlink to the
> original file you want NiFi to pull but have the symlink in the NiFi
> pickup directory.  NiFi is then free to read and delete which means it
> simply pulls whatever shows up in that directory and doesn't have to
> keep state about filenames and checksums.
>
> I realize we still need to do what you're suggesting as well but
> thought I'd run this by you.
>
> Joe
>
> On Sun, Jan 3, 2016 at 6:43 PM, obaidul karim  > wrote:
> > Hi Joe,
> >
> > Condider a scenerio, where we need to feed some older files and we are
> using
> > "mv" to feed files to input directory( to reduce IO we may use "mv"). If
> we
> > use "mv", last modified date will not changed. And this is very common
> on a
> > busy file collection system.
> >
> > However, I think I can still manage it by adding additional "touch"
> before
> > moving fole in the target directory.
> >
> > So, my suggestion is to add file selection criteria as an configurable
> > option in listfile process on workflow. Options could be last modified
> > date(as current one) unique file names, checksum etc.
> >
> > Thanks again man.
> > -Obaid
> >
> >
> > On Monday, January 4, 2016, Joe Witt >
> wrote:
> >>
> >> Hello Obaid,
> >>
> >> The default behavior of the ListFile processor is to keep track of the
> >> last modified time of the files it lists.  When you changed the name
> >> of the file that doesn't change the last modified time as tracked by
> >> the OS but when you altered content it does.  Simply 'touch' on the
> >> file would do it too.
> >>
> >> I believe we could observe the last modified time of the directory in
> >> which the file lives to detect something like a rename.  However, we'd
> >> not know which file was renamed just that something was changed.  So
> >> it require keeping some potentially problematic state to deconflict or
> >> requiring the user to have a duplicate detection process afterwards.
> >>
> >> So with that in mind is the current behavior sufficient for your case?
> >>
> >> Thanks
> >> Joe
> >>
> >> On Sun, Jan 3, 2016 at 6:17 AM, obaidul karim  > wrote:
> >> > Hi Joe,
> >> >
> >> > I am now exploring your solution.
> >> > Starting with below flow:
> >> >
> >> > ListFIle > FetchFile > CompressContent > PutFile.
> >> >
> >> > Seems all fine. Except some confusion with how ListFile identifies new
> >> > files.
> >> > In order to test, I renamed a already processed file and put in in
> input
> >> > folder and found that the file is not processing.
> >> > Then I randomly changed the content of the file and it was immediately
> >> > processed.
> >> >
> >> > My question is what is the new file selection criteria for "ListFile"
> ?
> >> > Can
> >> > I change it only to file name ?
> >> >
> >> > Thanks in advance.
> >> >
> >> > -Obaid
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt  > wrote:
> >> >>
> >> >> Hello Obaid,
> >> >>
> >> >> At 6 TB/day and average size of 2-3GB per dataset you're looking at a
> >> >> sustained rate of 70+MB/s and a pretty low transaction rate.  So well
> >> >> within a good range to work with on a single system.
> >> >>
> >> >> 'I's there any way to by pass writing flow files on disk or directly
> >> >> pass those files to HDFS as it is ?"
> >> >>
> >> >>   There is no way to bypass NiFi taking a copy of that data by
> design.
> >> >> NiFi is helping you formulate a graph of dataflow requirements from a
> >> >> given source(s) through given processing steps and ultimate driving
> >> >> data into given destination systems.  As a result it takes on the
> >> >> challenge of handling transactionality of each interaction and the
> >> >> buffering and backpressure to deal with the realities of different
> >> >> production/consumption patterns.
> >> >>
> >> >> "If the files on the spool directory are compressed(zip/gzip), can we
> >> >> store files on HDFS as uncompressed ?"
> >> >>
> >> >>   Certainly.  Both of those formats (zip/gzip) are supported in NiFi
> >> >> out of the box.  You simply run the data through the proper process
> >> >> prior to the PutHDFS process to unpack 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-03 Thread obaidul karim
Hi Joe,

I am now exploring your solution.
Starting with below flow:

ListFIle > FetchFile > CompressContent > PutFile.

Seems all fine. Except some confusion with how ListFile identifies new
files.
In order to test, I renamed a already processed file and put in in input
folder and found that the file is not processing.
Then I randomly changed the content of the file and it was immediately
processed.

My question is what is the new file selection criteria for "ListFile" ? Can
I change it only to file name ?

Thanks in advance.

-Obaid







On Fri, Jan 1, 2016 at 10:43 PM, Joe Witt  wrote:

> Hello Obaid,
>
> At 6 TB/day and average size of 2-3GB per dataset you're looking at a
> sustained rate of 70+MB/s and a pretty low transaction rate.  So well
> within a good range to work with on a single system.
>
> 'I's there any way to by pass writing flow files on disk or directly
> pass those files to HDFS as it is ?"
>
>   There is no way to bypass NiFi taking a copy of that data by design.
> NiFi is helping you formulate a graph of dataflow requirements from a
> given source(s) through given processing steps and ultimate driving
> data into given destination systems.  As a result it takes on the
> challenge of handling transactionality of each interaction and the
> buffering and backpressure to deal with the realities of different
> production/consumption patterns.
>
> "If the files on the spool directory are compressed(zip/gzip), can we
> store files on HDFS as uncompressed ?"
>
>   Certainly.  Both of those formats (zip/gzip) are supported in NiFi
> out of the box.  You simply run the data through the proper process
> prior to the PutHDFS process to unpack (zip) or decompress (gzip) as
> needed.
>
> "2.a Can we use our existing java code for masking ? if yes then how ?
> 2.b For this Scenario we also want to bypass storing flow files on
> disk. Can we do it on the fly, masking and storing on HDFS ?
> 2.c If the source files are compressed (zip/gzip), is there any issue
> for masking here ?"
>
>   You would build a custom NiFi processor that leverages your existing
> code.  If your code is able to operate on an InputStream and writes to
> an OutputStream then it is very likely you'll be able to handle
> arbitrarily large objects with zero negative impact to the JVM Heap as
> well.  This is thanks to the fact that the data is present in NiFi's
> repository with copy-on-write/pass-by-reference semantics and that the
> API is exposing those streams to your code in a transactional manner.
>
>   If you want the process of writing to HDFS to also do decompression
> and masking in one pass you'll need to extend/alter the PutHDFS
> process to do that.  It is probably best to implement the flow using
> cohesive processors (grab files, decompress files, mask files, write
> to hdfs).  Given how the repository construct in NiFi works and given
> how caching in Linux works it is very possible you'll be quite
> surprised by the throughput you'll see.  Even then you can optimize
> once you're sure you need to.  The other thing to keep in mind here is
> that often a flow that starts out as specific as this turns into a
> great place to tap the stream of data to feed some new system or new
> algorithm with a different format or protocol.  At that moment the
> benefits become even more obvious.
>
> Regarding the Flume processes in NiFi and their memory usage.  NiFi
> offers a nice hosting mechanism for the Flume processes and brings
> some of the benefits of NiFi's UI, provenance, repository concept.
> However, we're still largely limited to the design assumptions one
> gets when building a Flume process and that can be quite memory
> limiting.  We see what we have today as a great way to help people
> transition their existing Flume flows into NiFi by leveraging their
> existing code but would recommend working to phase the use of those
> out in time so that you can take full benefit of what NiFi brings over
> Flume.
>
> Thanks
> Joe
>
>
> On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim  wrote:
> > Hi,
> >
> > I am new in Nifi and exploring it as open source ETL tool.
> >
> > As per my understanding, flow files are stored on local disk and it
> contains
> > actual data.
> > If above is true, lets consider a below scenario:
> >
> > Scenario 1:
> > - In a spool directory we have terabytes(5-6TB/day) of files coming from
> > external sources
> > - I want to push those files to HDFS as it is without any changes
> >
> > Scenario 2:
> > - In a spool directory we have terabytes(5-6TB/day) of files coming from
> > external sources
> > - I want to mask some of the sensitive columns
> > - Then send one copy to HDFS and another copy to Kafka
> >
> > Question for Scenario 1:
> > 1.a In that case those 5-6TB data will be again written on local disk as
> > flow files and will cause double I/O. Which eventually may cause slower
> > performance due to I/O bottleneck.
> > Is there any way to by pass 

Re: Data Ingestion forLarge Source Files and Masking

2016-01-01 Thread Joe Witt
Hello Obaid,

At 6 TB/day and average size of 2-3GB per dataset you're looking at a
sustained rate of 70+MB/s and a pretty low transaction rate.  So well
within a good range to work with on a single system.

'I's there any way to by pass writing flow files on disk or directly
pass those files to HDFS as it is ?"

  There is no way to bypass NiFi taking a copy of that data by design.
NiFi is helping you formulate a graph of dataflow requirements from a
given source(s) through given processing steps and ultimate driving
data into given destination systems.  As a result it takes on the
challenge of handling transactionality of each interaction and the
buffering and backpressure to deal with the realities of different
production/consumption patterns.

"If the files on the spool directory are compressed(zip/gzip), can we
store files on HDFS as uncompressed ?"

  Certainly.  Both of those formats (zip/gzip) are supported in NiFi
out of the box.  You simply run the data through the proper process
prior to the PutHDFS process to unpack (zip) or decompress (gzip) as
needed.

"2.a Can we use our existing java code for masking ? if yes then how ?
2.b For this Scenario we also want to bypass storing flow files on
disk. Can we do it on the fly, masking and storing on HDFS ?
2.c If the source files are compressed (zip/gzip), is there any issue
for masking here ?"

  You would build a custom NiFi processor that leverages your existing
code.  If your code is able to operate on an InputStream and writes to
an OutputStream then it is very likely you'll be able to handle
arbitrarily large objects with zero negative impact to the JVM Heap as
well.  This is thanks to the fact that the data is present in NiFi's
repository with copy-on-write/pass-by-reference semantics and that the
API is exposing those streams to your code in a transactional manner.

  If you want the process of writing to HDFS to also do decompression
and masking in one pass you'll need to extend/alter the PutHDFS
process to do that.  It is probably best to implement the flow using
cohesive processors (grab files, decompress files, mask files, write
to hdfs).  Given how the repository construct in NiFi works and given
how caching in Linux works it is very possible you'll be quite
surprised by the throughput you'll see.  Even then you can optimize
once you're sure you need to.  The other thing to keep in mind here is
that often a flow that starts out as specific as this turns into a
great place to tap the stream of data to feed some new system or new
algorithm with a different format or protocol.  At that moment the
benefits become even more obvious.

Regarding the Flume processes in NiFi and their memory usage.  NiFi
offers a nice hosting mechanism for the Flume processes and brings
some of the benefits of NiFi's UI, provenance, repository concept.
However, we're still largely limited to the design assumptions one
gets when building a Flume process and that can be quite memory
limiting.  We see what we have today as a great way to help people
transition their existing Flume flows into NiFi by leveraging their
existing code but would recommend working to phase the use of those
out in time so that you can take full benefit of what NiFi brings over
Flume.

Thanks
Joe


On Fri, Jan 1, 2016 at 4:18 AM, obaidul karim  wrote:
> Hi,
>
> I am new in Nifi and exploring it as open source ETL tool.
>
> As per my understanding, flow files are stored on local disk and it contains
> actual data.
> If above is true, lets consider a below scenario:
>
> Scenario 1:
> - In a spool directory we have terabytes(5-6TB/day) of files coming from
> external sources
> - I want to push those files to HDFS as it is without any changes
>
> Scenario 2:
> - In a spool directory we have terabytes(5-6TB/day) of files coming from
> external sources
> - I want to mask some of the sensitive columns
> - Then send one copy to HDFS and another copy to Kafka
>
> Question for Scenario 1:
> 1.a In that case those 5-6TB data will be again written on local disk as
> flow files and will cause double I/O. Which eventually may cause slower
> performance due to I/O bottleneck.
> Is there any way to by pass writing flow files on disk or directly pass
> those files to HDFS as it is ?
> 1.b If the files on the spool directory are compressed(zip/gzip), can we
> store files on HDFS as uncompressed ?
>
> Question for Scenario 2:
> 2.a Can we use our existing java code for masking ? if yes then how ?
> 2.b For this Scenario we also want to bypass storing flow files on disk. Can
> we do it on the fly, masking and storing on HDFS ?
> 2.c If the source files are compressed (zip/gzip), is there any issue for
> masking here ?
>
>
> In fact, I tried above using flume+flume interceptors. Everything working
> fine with smaller files. But when source files greater that 50MB flume
> chocks :(.
> So, I am exploring options in NiFi. Hope I will get some guideline from you
> guys.
>
>
> Thanks in advance.