Thank you for the detailed explanation! It sounds like you have built
something very cool here.

I'm still digesting the different steps and thinking of what can be done,
but something that initially jumped out at me was
when you mentioned considering how much memory NiFi has and not wanting to
go over 1000 records per chunk...

You should be able to read and write the chunks in a streaming fashion and
never have the entire chunk in memory. For example,
when creating the chunk you would be looping over a ResultSet from the
database and writing each record to the OutputStream of the
FlowFile, never having all 1000 records in memory. On the down stream
processor you would read the record from the  InputStream of the
FlowFile, sending each one to the destination database, again not having
all 1000 records in memory. If you can operate like this then having
1000 records per chunk, or 100,000 records per chunk, shouldn't change the
memory requirement for NiFi.

An example of what we do for ExecuteSQL and QueryDatabaseTable is in the
JdbcCommon util where it converts the ResultSet to Avro records by writing
to the OutputStream:
https://github.com/apache/nifi/blob/e4b7e47836edf47042973e604005058c28eed23b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/JdbcCommon.java#L80

Another point is that it is not necessarily a bad thing to have say 10,000
Flow Files in a queue. The queue is not actually holding the content of
those FlowFiles, it is only holding pointers to where the content is, and
only when
the next processor does a session.read(flowFile, ...) does it then read in
the content as a stream. In general NiFi should be able to handle 10s of
thousands, or even 100s of thousands of Flow Files sitting in a queue.

With your current approach have you seen a specific issue, such as out of
memory exceptions? or were you just concerned by the number of flow files
in the queue continuing to grow?

I'll continue to think about this more, and maybe someone else on the list
has additional idea/thoughts.

-Bryan



On Wed, Jun 8, 2016 at 12:29 PM, Shaine Berube <
shaine.ber...@perfectsearchcorp.com> wrote:

> Perhaps I need to explain a little more about the data flow as a whole.
> But yes, Processor A is a custom built processor.
>
> In explanation:
> The data flow that I've built out is basically a 4 to 6 step process (6
> steps because the first and last processors are optional).  In the four
> step process, step one gathers information from the source and target
> databases in preparation for the migration of data from source to target,
> this includes table names, primary keys, and record counts, step one then
> produces a flow file per table which in this case is 24 flow files.
>
> Step two of the process would be the equivalent of processor A, in step two
> I'm taking in a flow file and generating the SQL queries that are going to
> be run.  The reason the back pressure doesn't work therefore is because the
> processor is working on one file, which corresponds to a table, which said
> table will be split into 1000 record chunks with the SQL query splitting.
> A fair few of these tables however, are over 10 million records, which
> means that on a single execution, this processor will generate over 10,000
> flow files (1000 record chunks).  As far as it goes, I cannot save this
> information directly to the VM or server that I'm running the data flow on,
> because the information can contain extremely sensitive and secure data.
> That being said, I need to consider how much memory the Nifi process has to
> run, so I don't want to go over 1000 records in a chunk.
>
> Step three of the process takes each individual flow file from the queue,
> pulls the SQL query out of the flow file contents, runs it against source,
> and then puts the results in either a CSV or an XML format into the
> contents of a flow file and sends it to the next queue.
>
> Step four of the process takes the results out of the flow file contents,
> sticks them into an SQL query and runs it against target.
>
> Keep in mind: this data flow has been built to handle migration, but also
> is attempting to keep up to date (incrementor/listener), with the source
> database.  Given that we don't have full access to the source database, I'm
> basically limited to running select queries against it and gathering the
> information I need to put into target.  But this data flow is configured to
> handle INSERT and UPDATE SQL queries, with DELETE queries coming some time
> in the future.  The data flow is configured so that step one can either be
> the migrator (full data dump), or the incrementor (incremental data dump,
> use incrementor after migrator has been run).
>
> Now, the six step process adds a step before step one that allows step one
> to be multi-threaded, and it adds a step after step four that runs the
> queries (basically step four turns into the step that generates queries for
> the next step to run).  If you want more information on the six step
> process I can give it, but it's not necessary to know for the question I'm
> asking.
>
> So now knowing more about the whole data flow model, is there a way for me
> to do some sort of notification system on step two so that we can keep the
> queue limited to 2000 between step two and step three?
> Keep in mind, each step in the four step process is another processor.  So
> there are four processors involved in the data flow that I'm working with,
> all four have been custom developed by yours truly, they work directly with
> the databases and run SQL queries.
>
> On Wed, Jun 8, 2016 at 9:31 AM, Bryan Bende <bbe...@gmail.com> wrote:
>
> > Ok I didn't realize you had already tried setting the back-pressure
> > settings. Can you described the processors a little more, are they custom
> > processors?
> >
> > I am guessing that ProcessorA is producing all 5k flow files from a
> single
> > execution of onTrigger, which would explain why back-pressure didn't
> solve
> > the problem, because
> > back-pressure would stop the processor from executing again, but its
> > already too late because the first execution already went over the limit.
> >
> > Without knowing too much about what ProcessorA is doing, I'm wondering
> if
> > there is a way to put some indirection between the two processors. What
> if
> > ProcessorA sent its
> > output to a PutFile processor that wrote all the chunks out to a
> directory,
> > then there was a separate GetFile processor that was concurrently picking
> > up the chunks from that
> > directory and sending to ProcessorB?
> >
> > Then the back-pressure between GetFile and ProcessorB would work because
> > once the queue reached 2000, GetFile wouldn't pick up anymore files. The
> > downside is you
> > would need enough disk-space on your NiFi node to possibly store your
> whole
> > database table, which may not be an option.
> >
> > Another idea might be to have two levels of chunks, for example with the
> > SplitText processor if we want to split a file with 1 million lines in
> it,
> > rather than do one split producing
> > 1 million flow files, we usually do a split to 10k chunks, then another
> > split down to 1 line. Maybe ProcessorA could produce much large chunks,
> say
> > 10k or 100k records each,
> > then the next processor further splits those before going to ProcessorB.
> > This would also allow back-pressure to work a little better the second
> > split processor and ProcessorB.
> >
> > If anyone else has ideas here, feel free to chime in.
> >
> > Thanks,
> >
> > Bryan
> >
> > On Wed, Jun 8, 2016 at 10:51 AM, Shaine Berube <
> > shaine.ber...@perfectsearchcorp.com> wrote:
> >
> > > I do need more information, because I tried using that option, but the
> > > processor just continued filling the queue anyway, I told it to only
> > allow
> > > 2000 before back pressure kicks in, but it kept going and I ended up
> with
> > > 5k files in the queue before I restarted Nifi to get the processor to
> > stop.
> > >
> > > On Wed, Jun 8, 2016 at 8:45 AM, Bryan Bende <bbe...@gmail.com> wrote:
> > >
> > > > Hello,
> > > >
> > > > Take a look at the options available when right-clicking on a
> queue...
> > > > What you described is what NiFi calls back-pressure. You can
> > configured a
> > > > queue to have an object threshold (# of flow files) or data size
> > > threshold
> > > > (total size of all flow files).
> > > > When one of these thresholds is reached, NiFi will no longer let the
> > > source
> > > > processor run until the condition goes back under the threshold.
> > > >
> > > > Let us know if you need any more info on this.
> > > >
> > > > Thanks,
> > > >
> > > > Bryan
> > > >
> > > > On Wed, Jun 8, 2016 at 10:40 AM, Shaine Berube <
> > > > shaine.ber...@perfectsearchcorp.com> wrote:
> > > >
> > > > > Hello all,
> > > > >
> > > > > I'm kind of new to developing Nifi, though I've been doing some
> > pretty
> > > in
> > > > > depth stuff and some advanced database queries.  My question is in
> > > > > regarding the queues between processor, I want to limit a queue
> to...
> > > say
> > > > > 2000, how would I go about doing that?  Or better yet, how would I
> > tell
> > > > the
> > > > > processor generating the queue to only put a max of 2000 files into
> > the
> > > > > queue?
> > > > >
> > > > > Allow me to explain with a scenario:
> > > > > We are doing data migration from one database to another.
> > > > > -Processor A is generating a queue consumed by Processor B
> > > > > -Processor A is taking configuration and generating SQL queries in
> > 1000
> > > > > record chunks so that Processor B can insert them into a new
> > database.
> > > > > Given the size of the source database, Processor A can potentially
> > > > generate
> > > > > hundreds of thousands of files.
> > > > >
> > > > > Is there a way for Processor A to check it's down stream queue for
> > the
> > > > > queue size?  How would I get Processor A to only put 2000 files
> into
> > > the
> > > > > queue at any given time, so that Processor A can continue running
> but
> > > > wait
> > > > > for room in the queue?
> > > > >
> > > > > Thank you in advance.
> > > > >
> > > > > --
> > > > > *Shaine Berube*
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > *Shaine Berube*
> > >
> >
>
>
>
> --
> *Shaine Berube*
>

Reply via email to