Perfect, thanks Mark. On Tue, Nov 24, 2015 at 2:55 AM, Mark Payne <marka...@hotmail.com> wrote:
> Salvatore, > > The caveat about the append method is that if you append to an incoming > FlowFile, it has > to copy the contents of the incoming FlowFile before it can append to it. > However, if you append > to a new FlowFile (or a FlowFile that you've already written to in the > same session), then the append > is extremely efficient and does not need to copy anything. It actually > holds open an OutputStream under > the hood so that you can keep writing to the same OutputStream. > > In general, I would advise going the route that you specified here, > though, where in one processor > you are splitting the data out like you need and in a second processor you > do the conversion from > one format to another. These are very independent concepts, and so > breaking them into separate > processors increases the cohesion and lets you easily reuse the Processors > if you need to do something > slightly different later. > > Thanks > -mark > > > > On Nov 23, 2015, at 10:49 AM, Salvatore Papa <salvatore.p...@gmail.com> > wrote: > > > > Hi Bryan, Mark, > > > > I knew I was forgetting something! So I had actually noticed the > > session.append, but I think there are two caveats which make it not what > I > > want: > > > > 1) Does session.append truly 'append'? Or will it re-write/copy the > content > > that's already in the new flowfile (I think I saw that as a note > somewhere > > - please correct me if i'm wrong. But if i'm right, performance wise, > > that's actually worse than reading/writing the input file O(N) times, > > that'd make it O(N^2)) > > 2) The output actually isn't raw/text - I have other writers using that > > stream. For example, reading a text CSV, and writing each column to to > > Avro. Something that may not be 'appendable', hence requiring the output > > flowfile OutputStreams to stay open for the entire duration of the > > processing. > > > > Thanks for the suggestion though - I hadn't seen the RouteText processor > > before. It's actually very close to what i'm looking for! I'll play > around > > with it and see if it suits - at the very least... Splitting via text > first > > (skipping the requirement for the nested writer), and then writing each > of > > those out to (e.g.) Avro in a second processor, may be the best bet. > > > > Thanks Bryan and Mark! > > > > On Tue, Nov 24, 2015 at 1:18 AM, Mark Payne <marka...@hotmail.com> > wrote: > > > >> Hey Salvatore, > >> > >> I think the key piece that you are missing is the > ProcessSession.append() > >> method. You can > >> use this efficiently append to FlowFile A, then to FlowFile B, then to > >> FlowFile A, then to FlowFile C, > >> or what-have-you. A good example that comes to mind is the RouteText > >> Processor. This is available > >> on the 'master' branch. > >> > >> So the overall logic would look something like: > >> > >> List<FlowFile> flowFiles = new ArrayList<>(); > >> for (int i=0; i < numColumns; i++) { > >> FlowFile colFlowFile = session.create(originalFlowFile); > >> flowFiles.add(colFlowFile); > >> } > >> > >> // read line of text > >> session.read(originalFlowFile, new InputStreamCallback() { > >> void process(final InputStream rawIn) { > >> try (final BufferedReader in = new BufferedReader(new > >> InputStreamReader(rawIn))) { > >> String line = in.readLine(); > >> > >> String[] columns = line.split(","); > >> for (int i=0; i < columns.length; i++) { > >> FlowFile colFlowFile = flowFiles.get(i); > >> colFlowFile = session.append(colFlowFile, new > >> OutputStreamCallback() { > >> void process(final OutputStream out) { > >> > >> out.write(columns[i].getBytes(StandardCharsets.UTF_8); > >> } > >> }); > >> > >> flowFiles.set(i, colFlowFile); > >> } > >> } > >> } > >> > >> But as mentioned, the RouteText processor is a great full processor to > use > >> as an example. > >> > >> Let us know if you run into any more problems! > >> > >> Thanks > >> -Mark > >> > >> > >> > >>> On Nov 23, 2015, at 2:40 AM, Salvatore Papa <salvatore.p...@gmail.com> > >> wrote: > >>> > >>> Heya NiFi devs, > >>> > >>> I'm having a bit of trouble trying to wrap my head around a valid way > of > >>> tackling this problem with the available Processor templates. I'd like > to > >>> split an input flowfile into N different flowfiles, 1 going into 1 of N > >>> relationships. > >>> > >>> A simplistic way of viewing it would be: A very large CSV file, with N > >>> columns, and I want to split each column into its own flowfile, and > each > >> of > >>> these flowfiles to its own relationship (or with an attribute saying > >> which > >>> column it belongs to). > >>> > >>> Basic premise is for an example with two columns, and only two lines: > >>> * Read a line, write first column value to flowfile A, write second > >> column > >>> value to flowfile B > >>> * Read next line, appending first column value to flowfile A, appending > >>> second column value to flowfile B > >>> Followed by one of: > >>> * Send flowfile A to relationship A, and send flowfile B to > relationship > >> B > >>> or > >>> * Set attribute "A" to flowfile A, attribute "B" to flowfile B, then > send > >>> both A and B to a 'success' relationship. > >>> > >>> Unfortunately, I can't seem to find a way to write to multiple > flowfiles > >> at > >>> once, or at least, write to an outputstream for one flowfile, then > write > >> to > >>> another outputstream for another flowfile, then continue writing to the > >>> first flowfile. > >>> > >>> If they weren't such large files, i'd be okay with reading the input > >> file N > >>> times, pulling out the different part each time, but i'd like to only > >> have > >>> to read each line (by extension, the file) only once. > >>> > >>> I've written AbstractProcessors before for simple One-to-One > >>> transformations, and even Merge processors which use are an extension > of > >>> AbstractSessionFactoryProcessors to do Many-to-One, and even Split > >>> AbstractProcessors for One-to-Many in serial (splitting at different > >>> places, even clone(flowfile, start, size); But I can't work out a way > to > >> do > >>> this One-to-Many in parallel. > >>> > >>> Any ideas? Am I missing something useful? Do I just have to do it > reading > >>> it multiple times? Just a really simple proof of concept explaining the > >>> design would be enough to get me started. > >>> > >>> Kind regards, > >>> Salvatore > >> > >> > >