Re: Coding a Processor that writes to multiple output flowfiles at once

2015-11-23 Thread Bryan Bende
Hi Salvatore,

Have you looked at the append() method on ProcessSession which lets you
append to the content of a FlowFile?

You should be able to create several new FlowFiles, and then while reading
lines from the incoming FlowFile, append the appropriate parts to each of
the new FlowFIles.

An example processor that does something like this is the new RouteText
processor:
https://github.com/apache/nifi/blob/773576e041088d9e326f1d2e84b0ad8acbd6cfdc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java#L485

Let us know if this helps.

Thanks,

Bryan


On Mon, Nov 23, 2015 at 2:40 AM, Salvatore Papa 
wrote:

> Heya NiFi devs,
>
> I'm having a bit of trouble trying to wrap my head around a valid way of
> tackling this problem with the available Processor templates. I'd like to
> split an input flowfile into N different flowfiles, 1 going into 1 of N
> relationships.
>
> A simplistic way of viewing it would be: A very large CSV file, with N
> columns, and I want to split each column into its own flowfile, and each of
> these flowfiles to its own relationship (or with an attribute saying which
> column it belongs to).
>
> Basic premise is for an example with two columns, and only two lines:
> * Read a line, write first column value to flowfile A, write second column
> value to flowfile B
> * Read next line, appending first column value to flowfile A, appending
> second column value to flowfile B
> Followed by one of:
> * Send flowfile A to relationship A, and send flowfile B to relationship B
> or
> * Set attribute "A" to flowfile A, attribute "B" to flowfile B, then send
> both A and B to a 'success' relationship.
>
> Unfortunately, I can't seem to find a way to write to multiple flowfiles at
> once, or at least, write to an outputstream for one flowfile, then write to
> another outputstream for another flowfile, then continue writing to the
> first flowfile.
>
> If they weren't such large files, i'd be okay with reading the input file N
> times, pulling out the different part each time, but i'd like to only have
> to read each line (by extension, the file) only once.
>
> I've written AbstractProcessors before for simple One-to-One
> transformations, and even Merge processors which use are an extension of
> AbstractSessionFactoryProcessors to do Many-to-One, and even Split
> AbstractProcessors for One-to-Many in serial (splitting at different
> places, even clone(flowfile, start, size); But I can't work out a way to do
> this One-to-Many in parallel.
>
> Any ideas? Am I missing something useful? Do I just have to do it reading
> it multiple times? Just a really simple proof of concept explaining the
> design would be enough to get me started.
>
> Kind regards,
> Salvatore
>


Re: Coding a Processor that writes to multiple output flowfiles at once

2015-11-23 Thread Mark Payne
Hey Salvatore,

I think the key piece that you are missing is the ProcessSession.append() 
method. You can
use this efficiently append to FlowFile A, then to FlowFile B, then to FlowFile 
A, then to FlowFile C,
or what-have-you. A good example that comes to mind is the RouteText Processor. 
This is available
on the 'master' branch.

So the overall logic would look something like:

List flowFiles = new ArrayList<>();
for (int i=0; i < numColumns; i++) {
FlowFile colFlowFile = session.create(originalFlowFile);
flowFiles.add(colFlowFile);
}

// read line of text
session.read(originalFlowFile, new InputStreamCallback() {
  void process(final InputStream rawIn) {
  try (final BufferedReader in = new BufferedReader(new 
InputStreamReader(rawIn))) {
String line = in.readLine();

String[] columns = line.split(",");
for (int i=0; i < columns.length; i++) {
FlowFile colFlowFile = flowFiles.get(i);
colFlowFile = session.append(colFlowFile, new 
OutputStreamCallback() {
void process(final OutputStream out) {
  
out.write(columns[i].getBytes(StandardCharsets.UTF_8);
}
});

flowFiles.set(i, colFlowFile);
}
}
}

But as mentioned, the RouteText processor is a great full processor to use as 
an example.

Let us know if you run into any more problems!

Thanks
-Mark



> On Nov 23, 2015, at 2:40 AM, Salvatore Papa  wrote:
> 
> Heya NiFi devs,
> 
> I'm having a bit of trouble trying to wrap my head around a valid way of
> tackling this problem with the available Processor templates. I'd like to
> split an input flowfile into N different flowfiles, 1 going into 1 of N
> relationships.
> 
> A simplistic way of viewing it would be: A very large CSV file, with N
> columns, and I want to split each column into its own flowfile, and each of
> these flowfiles to its own relationship (or with an attribute saying which
> column it belongs to).
> 
> Basic premise is for an example with two columns, and only two lines:
> * Read a line, write first column value to flowfile A, write second column
> value to flowfile B
> * Read next line, appending first column value to flowfile A, appending
> second column value to flowfile B
> Followed by one of:
> * Send flowfile A to relationship A, and send flowfile B to relationship B
> or
> * Set attribute "A" to flowfile A, attribute "B" to flowfile B, then send
> both A and B to a 'success' relationship.
> 
> Unfortunately, I can't seem to find a way to write to multiple flowfiles at
> once, or at least, write to an outputstream for one flowfile, then write to
> another outputstream for another flowfile, then continue writing to the
> first flowfile.
> 
> If they weren't such large files, i'd be okay with reading the input file N
> times, pulling out the different part each time, but i'd like to only have
> to read each line (by extension, the file) only once.
> 
> I've written AbstractProcessors before for simple One-to-One
> transformations, and even Merge processors which use are an extension of
> AbstractSessionFactoryProcessors to do Many-to-One, and even Split
> AbstractProcessors for One-to-Many in serial (splitting at different
> places, even clone(flowfile, start, size); But I can't work out a way to do
> this One-to-Many in parallel.
> 
> Any ideas? Am I missing something useful? Do I just have to do it reading
> it multiple times? Just a really simple proof of concept explaining the
> design would be enough to get me started.
> 
> Kind regards,
> Salvatore



Re: Coding a Processor that writes to multiple output flowfiles at once

2015-11-23 Thread Salvatore Papa
Hi Bryan, Mark,

I knew I was forgetting something! So I had actually noticed the
session.append, but I think there are two caveats which make it not what I
want:

1) Does session.append truly 'append'? Or will it re-write/copy the content
that's already in the new flowfile (I think I saw that as a note somewhere
- please correct me if i'm wrong. But if i'm right, performance wise,
that's actually worse than reading/writing the input file O(N) times,
that'd make it O(N^2))
2) The output actually isn't raw/text - I have other writers using that
stream. For example, reading a text CSV, and writing each column to to
Avro. Something that may not be 'appendable', hence requiring the output
flowfile OutputStreams to stay open for the entire duration of the
processing.

Thanks for the suggestion though - I hadn't seen the RouteText processor
before. It's actually very close to what i'm looking for! I'll play around
with it and see if it suits - at the very least... Splitting via text first
(skipping the requirement for the nested writer), and then writing each of
those out to (e.g.) Avro in a second processor, may be the best bet.

Thanks Bryan and Mark!

On Tue, Nov 24, 2015 at 1:18 AM, Mark Payne  wrote:

> Hey Salvatore,
>
> I think the key piece that you are missing is the ProcessSession.append()
> method. You can
> use this efficiently append to FlowFile A, then to FlowFile B, then to
> FlowFile A, then to FlowFile C,
> or what-have-you. A good example that comes to mind is the RouteText
> Processor. This is available
> on the 'master' branch.
>
> So the overall logic would look something like:
>
> List flowFiles = new ArrayList<>();
> for (int i=0; i < numColumns; i++) {
> FlowFile colFlowFile = session.create(originalFlowFile);
> flowFiles.add(colFlowFile);
> }
>
> // read line of text
> session.read(originalFlowFile, new InputStreamCallback() {
>   void process(final InputStream rawIn) {
>   try (final BufferedReader in = new BufferedReader(new
> InputStreamReader(rawIn))) {
> String line = in.readLine();
>
> String[] columns = line.split(",");
> for (int i=0; i < columns.length; i++) {
> FlowFile colFlowFile = flowFiles.get(i);
> colFlowFile = session.append(colFlowFile, new
> OutputStreamCallback() {
> void process(final OutputStream out) {
>
> out.write(columns[i].getBytes(StandardCharsets.UTF_8);
> }
> });
>
> flowFiles.set(i, colFlowFile);
> }
> }
> }
>
> But as mentioned, the RouteText processor is a great full processor to use
> as an example.
>
> Let us know if you run into any more problems!
>
> Thanks
> -Mark
>
>
>
> > On Nov 23, 2015, at 2:40 AM, Salvatore Papa 
> wrote:
> >
> > Heya NiFi devs,
> >
> > I'm having a bit of trouble trying to wrap my head around a valid way of
> > tackling this problem with the available Processor templates. I'd like to
> > split an input flowfile into N different flowfiles, 1 going into 1 of N
> > relationships.
> >
> > A simplistic way of viewing it would be: A very large CSV file, with N
> > columns, and I want to split each column into its own flowfile, and each
> of
> > these flowfiles to its own relationship (or with an attribute saying
> which
> > column it belongs to).
> >
> > Basic premise is for an example with two columns, and only two lines:
> > * Read a line, write first column value to flowfile A, write second
> column
> > value to flowfile B
> > * Read next line, appending first column value to flowfile A, appending
> > second column value to flowfile B
> > Followed by one of:
> > * Send flowfile A to relationship A, and send flowfile B to relationship
> B
> > or
> > * Set attribute "A" to flowfile A, attribute "B" to flowfile B, then send
> > both A and B to a 'success' relationship.
> >
> > Unfortunately, I can't seem to find a way to write to multiple flowfiles
> at
> > once, or at least, write to an outputstream for one flowfile, then write
> to
> > another outputstream for another flowfile, then continue writing to the
> > first flowfile.
> >
> > If they weren't such large files, i'd be okay with reading the input
> file N
> > times, pulling out the different part each time, but i'd like to only
> have
> > to read each line (by extension, the file) only once.
> >
> > I've written AbstractProcessors before for simple One-to-One
> > transformations, and even Merge processors which use are an extension of
> > AbstractSessionFactoryProcessors to do Many-to-One, and even Split
> > AbstractProcessors for One-to-Many in serial (splitting at different
> > places, even clone(flowfile, start, size); But I can't work out a way to
> do
> > this One-to-Many in parallel.
> >
> > Any ideas? Am I missing something useful? Do I just have to do it reading
> > it multiple times? Just a really simple proof of concept

Re: Coding a Processor that writes to multiple output flowfiles at once

2015-11-23 Thread Mark Payne
Salvatore,

The caveat about the append method is that if you append to an incoming 
FlowFile, it has
to copy the contents of the incoming FlowFile before it can append to it. 
However, if you append
to a new FlowFile (or a FlowFile that you've already written to in the same 
session), then the append
is extremely efficient and does not need to copy anything. It actually holds 
open an OutputStream under
the hood so that you can keep writing to the same OutputStream.

In general, I would advise going the route that you specified here, though, 
where in one processor
you are splitting the data out like you need and in a second processor you do 
the conversion from
one format to another. These are very independent concepts, and so breaking 
them into separate
processors increases the cohesion and lets you easily reuse the Processors if 
you need to do something
slightly different later.

Thanks
-mark


> On Nov 23, 2015, at 10:49 AM, Salvatore Papa  wrote:
> 
> Hi Bryan, Mark,
> 
> I knew I was forgetting something! So I had actually noticed the
> session.append, but I think there are two caveats which make it not what I
> want:
> 
> 1) Does session.append truly 'append'? Or will it re-write/copy the content
> that's already in the new flowfile (I think I saw that as a note somewhere
> - please correct me if i'm wrong. But if i'm right, performance wise,
> that's actually worse than reading/writing the input file O(N) times,
> that'd make it O(N^2))
> 2) The output actually isn't raw/text - I have other writers using that
> stream. For example, reading a text CSV, and writing each column to to
> Avro. Something that may not be 'appendable', hence requiring the output
> flowfile OutputStreams to stay open for the entire duration of the
> processing.
> 
> Thanks for the suggestion though - I hadn't seen the RouteText processor
> before. It's actually very close to what i'm looking for! I'll play around
> with it and see if it suits - at the very least... Splitting via text first
> (skipping the requirement for the nested writer), and then writing each of
> those out to (e.g.) Avro in a second processor, may be the best bet.
> 
> Thanks Bryan and Mark!
> 
> On Tue, Nov 24, 2015 at 1:18 AM, Mark Payne  wrote:
> 
>> Hey Salvatore,
>> 
>> I think the key piece that you are missing is the ProcessSession.append()
>> method. You can
>> use this efficiently append to FlowFile A, then to FlowFile B, then to
>> FlowFile A, then to FlowFile C,
>> or what-have-you. A good example that comes to mind is the RouteText
>> Processor. This is available
>> on the 'master' branch.
>> 
>> So the overall logic would look something like:
>> 
>> List flowFiles = new ArrayList<>();
>> for (int i=0; i < numColumns; i++) {
>>FlowFile colFlowFile = session.create(originalFlowFile);
>>flowFiles.add(colFlowFile);
>> }
>> 
>> // read line of text
>> session.read(originalFlowFile, new InputStreamCallback() {
>>  void process(final InputStream rawIn) {
>>  try (final BufferedReader in = new BufferedReader(new
>> InputStreamReader(rawIn))) {
>>String line = in.readLine();
>> 
>>String[] columns = line.split(",");
>>for (int i=0; i < columns.length; i++) {
>>FlowFile colFlowFile = flowFiles.get(i);
>>colFlowFile = session.append(colFlowFile, new
>> OutputStreamCallback() {
>>void process(final OutputStream out) {
>> 
>> out.write(columns[i].getBytes(StandardCharsets.UTF_8);
>>}
>>});
>> 
>>flowFiles.set(i, colFlowFile);
>>}
>>}
>> }
>> 
>> But as mentioned, the RouteText processor is a great full processor to use
>> as an example.
>> 
>> Let us know if you run into any more problems!
>> 
>> Thanks
>> -Mark
>> 
>> 
>> 
>>> On Nov 23, 2015, at 2:40 AM, Salvatore Papa 
>> wrote:
>>> 
>>> Heya NiFi devs,
>>> 
>>> I'm having a bit of trouble trying to wrap my head around a valid way of
>>> tackling this problem with the available Processor templates. I'd like to
>>> split an input flowfile into N different flowfiles, 1 going into 1 of N
>>> relationships.
>>> 
>>> A simplistic way of viewing it would be: A very large CSV file, with N
>>> columns, and I want to split each column into its own flowfile, and each
>> of
>>> these flowfiles to its own relationship (or with an attribute saying
>> which
>>> column it belongs to).
>>> 
>>> Basic premise is for an example with two columns, and only two lines:
>>> * Read a line, write first column value to flowfile A, write second
>> column
>>> value to flowfile B
>>> * Read next line, appending first column value to flowfile A, appending
>>> second column value to flowfile B
>>> Followed by one of:
>>> * Send flowfile A to relationship A, and send flowfile B to relationship
>> B
>>> or
>>> * Set attribute "A" to flowfile A, attribute "B" to flowfile B, then send

Re: Coding a Processor that writes to multiple output flowfiles at once

2015-11-23 Thread Salvatore Papa
Perfect, thanks Mark.

On Tue, Nov 24, 2015 at 2:55 AM, Mark Payne  wrote:

> Salvatore,
>
> The caveat about the append method is that if you append to an incoming
> FlowFile, it has
> to copy the contents of the incoming FlowFile before it can append to it.
> However, if you append
> to a new FlowFile (or a FlowFile that you've already written to in the
> same session), then the append
> is extremely efficient and does not need to copy anything. It actually
> holds open an OutputStream under
> the hood so that you can keep writing to the same OutputStream.
>
> In general, I would advise going the route that you specified here,
> though, where in one processor
> you are splitting the data out like you need and in a second processor you
> do the conversion from
> one format to another. These are very independent concepts, and so
> breaking them into separate
> processors increases the cohesion and lets you easily reuse the Processors
> if you need to do something
> slightly different later.
>
> Thanks
> -mark
>
>
> > On Nov 23, 2015, at 10:49 AM, Salvatore Papa 
> wrote:
> >
> > Hi Bryan, Mark,
> >
> > I knew I was forgetting something! So I had actually noticed the
> > session.append, but I think there are two caveats which make it not what
> I
> > want:
> >
> > 1) Does session.append truly 'append'? Or will it re-write/copy the
> content
> > that's already in the new flowfile (I think I saw that as a note
> somewhere
> > - please correct me if i'm wrong. But if i'm right, performance wise,
> > that's actually worse than reading/writing the input file O(N) times,
> > that'd make it O(N^2))
> > 2) The output actually isn't raw/text - I have other writers using that
> > stream. For example, reading a text CSV, and writing each column to to
> > Avro. Something that may not be 'appendable', hence requiring the output
> > flowfile OutputStreams to stay open for the entire duration of the
> > processing.
> >
> > Thanks for the suggestion though - I hadn't seen the RouteText processor
> > before. It's actually very close to what i'm looking for! I'll play
> around
> > with it and see if it suits - at the very least... Splitting via text
> first
> > (skipping the requirement for the nested writer), and then writing each
> of
> > those out to (e.g.) Avro in a second processor, may be the best bet.
> >
> > Thanks Bryan and Mark!
> >
> > On Tue, Nov 24, 2015 at 1:18 AM, Mark Payne 
> wrote:
> >
> >> Hey Salvatore,
> >>
> >> I think the key piece that you are missing is the
> ProcessSession.append()
> >> method. You can
> >> use this efficiently append to FlowFile A, then to FlowFile B, then to
> >> FlowFile A, then to FlowFile C,
> >> or what-have-you. A good example that comes to mind is the RouteText
> >> Processor. This is available
> >> on the 'master' branch.
> >>
> >> So the overall logic would look something like:
> >>
> >> List flowFiles = new ArrayList<>();
> >> for (int i=0; i < numColumns; i++) {
> >>FlowFile colFlowFile = session.create(originalFlowFile);
> >>flowFiles.add(colFlowFile);
> >> }
> >>
> >> // read line of text
> >> session.read(originalFlowFile, new InputStreamCallback() {
> >>  void process(final InputStream rawIn) {
> >>  try (final BufferedReader in = new BufferedReader(new
> >> InputStreamReader(rawIn))) {
> >>String line = in.readLine();
> >>
> >>String[] columns = line.split(",");
> >>for (int i=0; i < columns.length; i++) {
> >>FlowFile colFlowFile = flowFiles.get(i);
> >>colFlowFile = session.append(colFlowFile, new
> >> OutputStreamCallback() {
> >>void process(final OutputStream out) {
> >>
> >> out.write(columns[i].getBytes(StandardCharsets.UTF_8);
> >>}
> >>});
> >>
> >>flowFiles.set(i, colFlowFile);
> >>}
> >>}
> >> }
> >>
> >> But as mentioned, the RouteText processor is a great full processor to
> use
> >> as an example.
> >>
> >> Let us know if you run into any more problems!
> >>
> >> Thanks
> >> -Mark
> >>
> >>
> >>
> >>> On Nov 23, 2015, at 2:40 AM, Salvatore Papa 
> >> wrote:
> >>>
> >>> Heya NiFi devs,
> >>>
> >>> I'm having a bit of trouble trying to wrap my head around a valid way
> of
> >>> tackling this problem with the available Processor templates. I'd like
> to
> >>> split an input flowfile into N different flowfiles, 1 going into 1 of N
> >>> relationships.
> >>>
> >>> A simplistic way of viewing it would be: A very large CSV file, with N
> >>> columns, and I want to split each column into its own flowfile, and
> each
> >> of
> >>> these flowfiles to its own relationship (or with an attribute saying
> >> which
> >>> column it belongs to).
> >>>
> >>> Basic premise is for an example with two columns, and only two lines:
> >>> * Read a line, write first column value to flowfile A, write second
> >> column
> >>> value