Good points - I've done some testing.

About 1-2 minutes for 1 month's data with 1k page sizes and about half that
for 10k. About 8-10 minutes for 1 years worth of data at 10k pages.

Per month looks like the sweet spot in terms of size - that's about
500-750MB.

In terms of building the upstream tools to generate the queries, is the
paginatedjsonquery the way to go to retrieve the oldest and most recent
date from an index?


~

On Sun, Aug 20, 2023 at 1:53 AM Chris Sampson <[email protected]> wrote:

> I'd guess it depends on what you want to achieve downstream, e.g. would
> setting the query processor to output per_query and return everything in 1
> to be useful? Internally, the processor is so fetching everything in pages
> from Elasticsearch, setting the size higher will reduce the number of
> network round-trips, but note that nifi will hold the entire response from
> Elasticsearch in memory until it is written to a flowfile - this is fine
> before the next loop within the processor, even if the prices session isn't
> committed and you don't see the output for a while.
>
> You've a choice to make between number of network calls (page fetches),
> number of queries (which kind of amounts to the same thing really), page
> size in memory (will impact both nifi and elasticsearch, as well as network
> performance), and number of flowfiles you want to deal with downstream -
> having all your data in a single flowfile might be useful, if you can use
> Record-based processors for everything you want to do later - the fewer
> flowfiles you have, the more performance your flow is likely to be (general
> oversimplification).
>
> How long did it take for you to fetch a day of data using 1k page sizes?
> Did it work if you up page size to 10k? How about 10k page for a month or a
> whole year?
>
> If you decide to break up the query by time range, e.g. years or months,
> then a python or groovy script is certainly an option in order to generate
> the parameters (e.g. attributes on a flowfile) to feed into the query.
>
> On 2023/08/19 05:05:39 Richard Beare wrote:
> > A bit of progress.
> > First up, firing a match_all at my index with 20M documents doesn't work,
> > as you probably expected. Or more precisely, is unlikely to be useful - I
> > left it overnight and nothing appeared to have happened, so I guess it
> was
> > madly fetching pages and filling up available storage.
> >
> > So I tested with a query of the form
> > {
> > query": {
> >   "range" : {
> >     "Visit_DateTime": {
> >       "gte" : "01/07/2020",
> >       "lte" : "02/07/2020",
> >       "format" : "dd/MM/yyyy"
> >     }
> >   }
> > }
> > }
> >
> > i.e a single days worth of documents (38998 according to a curl _count
> > version of the query). This did indeed produce 3900 flowfiles in the hits
> > queue and consume the input.
> >
> > Including a size parameter as follows:
> >
> > {
> > "size" : 1000,
> > query": {
> >   "range" : {
> >     "Visit_DateTime": {
> >       "gte" : "01/07/2020",
> >       "lte" : "02/07/2020",
> >       "format" : "dd/MM/yyyy"
> >     }
> >   }
> > }
> > }
> >
> > Leads to 39 flowfiles in the hits queue.
> >
> > So it looks like my best way forward processing many years worth of data
> is
> > to generate a set of day-based queries. Is a python script the best
> option?
> >
> >
> >
> >
> > On Fri, Aug 18, 2023 at 4:03 PM Chris Sampson <[email protected]> wrote:
> >
> > > Ah, so these processors have all been written for Elasticsearch, and
> use
> > > the Elasticsearch low-level REST API library to form connections.
> They've
> > > not been tested against OpenSearch, although hopefully should work for
> any
> > > interactions where the API is the same, but the two products continue
> to
> > > diverge, so there's increasing chance that some things won't work.
> > >
> > > Any details of things that aren't working would be good to know about
> > > (e.g. raised as Jira tickets, containing a much detail as possible,
> like
> > > the query used and any log details of errors), so that the community
> could
> > > look into providing OpenSearch compatibility in the future.
> > >
> > > I've known a few people try with OpenSearch and things either work, or
> we
> > > don't hear about the errors that are received, so we don't know what
> needs
> > > looking at from a NiFi perspective.
> > >
> > > On 2023/08/18 04:37:10 Richard Beare wrote:
> > > > I did use the example and got errors. I'll revisit that (perhaps it
> is an
> > > > opensearch idiosyncrasy).  The per response option is probably my
> issue.
> > > > I'll check that out and get back to you.
> > > > Thanks again
> > > >
> > > > On Fri, Aug 18, 2023 at 2:30 PM Chris Sampson <[email protected]>
> wrote:
> > > >
> > > > > Check the example in the processor's additional details docs [1]
> for
> > > how
> > > > > you could set size and sort fields for the query - size is used to
> > > > > determine the number of documents returned per page, sorry is
> required
> > > if
> > > > > using a "search after" or "point in time" query type.
> > > > >
> > > > > If the Query property is set, the incoming FlowFile content should
> be
> > > > > ignored, i.e. it doesn't need to be empty.
> > > > >
> > > > > Use the "Search Results Split" property to determine how the
> results
> > > are
> > > > > output. This defaults to "per response", which outputs a flowfile
> for
> > > every
> > > > > page of results. As PaginatedJsonQueryElasticsearch takes an input
> > > > > flowfile, its internal "process session" remains active until the
> > > processor
> > > > > completes and commits is session - this happens when there are no
> more
> > > > > results to retrieve from Elasticsearch, at which point the input
> > > flowfile
> > > > > disappears from the input queue and all output flowfiles appear in
> the
> > > > > output queues. This is how the nifi framework handles sessions, but
> > > can be
> > > > > confusing if you're not aware of that beforehand.
> > > > >
> > > > > SearchElasticsearch is different in this regard because its session
> > > ends
> > > > > after every iteration (determined by the "Search Results Split",
> e.g.
> > > this
> > > > > could be per page or per entire query), then uses nifi state to
> setup
> > > the
> > > > > next iteration. This means you could start to see output flowfiles
> > > sooner.
> > > > >
> > > > >
> > > > > [1]
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html
> > > > >
> > > > > On 2023/08/17 22:13:22 Richard Beare wrote:
> > > > > > Thanks, that makes sense. I've had trouble getting a size
> parameter
> > > > > > accepted, but will work on that later.
> > > > > >
> > > > > > However, I'm unsure what I should expect to see in the following
> test
> > > > > > scenario.
> > > > > >
> > > > > > A fixed query in the Query parameter - a match all. i.e. nothing
> > > dynamic
> > > > > > set by upstream processing
> > > > > >
> > > > > > An empty input flowfile to trigger activity.
> > > > > >
> > > > > > The test index is large. (20M docs)
> > > > > >
> > > > > > Do I expect the processor to begin filling the output queue as
> fast
> > > as it
> > > > > > can, with one flowfile per received page, pausing as the queue
> fills?
> > > > > > That was what I was anticipating, but at the moment I'm getting
> no
> > > output
> > > > > > and the input flowfile isn't being consumed. I suspect one flag
> is
> > > wrong,
> > > > > > but can't see it.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson <
> [email protected]>
> > > > > wrote:
> > > > > >
> > > > > > > Again, sounds like it's working as documented [1] - an input is
> > > > > required
> > > > > > > to trigger the PaginatedJsonQueryElasticsearch processor, so
> > > something
> > > > > like
> > > > > > > GenerateFlowFile is a way to achieve that if you want to
> > > periodically
> > > > > > > execute a paginated query, e.g. by setting the Generate
> processor's
> > > > > > > schedule to run every hour, or use cron syntax, etc. The
> advantage
> > > with
> > > > > > > this processor is that you can use the output of another
> processor
> > > > > (e.g.
> > > > > > > build a query using the results of another processor, such as
> an
> > > > > initial
> > > > > > > query of Elasticsearch) to trigger the paginated query of
> > > > > Elasticsearch,
> > > > > > > but once the query is finished, the processor won't keep
> firing.
> > > > > > >
> > > > > > > Conversely, SearchElasticsearch does not allow incoming
> > > connections,
> > > > > but
> > > > > > > only triggers the same query on the defined schedule. If the
> query
> > > > > needs to
> > > > > > > use parameters (or some sort of variable), you need to figure
> out
> > > how
> > > > > to
> > > > > > > apply that in the Query parameter of the processor - it could
> be by
> > > > > > > Elasticsearch notation (e.g. "now/d" for the start of the
> current
> > > day
> > > > > in a
> > > > > > > date range filter), or something that can be achieved using
> NiFi
> > > > > Expression
> > > > > > > Language [2], but without the flexibility of providing inputs
> in
> > > > > FlowFile
> > > > > > > content, which could be the output of a previous query, or
> > > > > > > GenerateFlowFile, etc.
> > > > > > >
> > > > > > > You need to figure out what query you want to run, what
> input(s)
> > > are
> > > > > > > appropriate, and the schedule to which you want to execute.
> > > > > > >
> > > > > > > The Search processor is aimed more at a use case of "I want to
> > > > > continually
> > > > > > > retrieve the contents of an Elasticsearch index/query as it is
> > > > > populated
> > > > > > > from an extremal source", PaginatedQuery is more for "I want to
> > > > > retrieve
> > > > > > > data from Elasticsearch that match a query"; both processors
> are
> > > meant
> > > > > to
> > > > > > > "allow for the possibility of many documents to be retrieved".
> > > > > > >
> > > > > > > For various reasons, neither processor was designed to hold
> state
> > > > > between
> > > > > > > initiation of paginated queries, e.g. they don't follow the
> > > pattern of
> > > > > a
> > > > > > > "Consume" or "List" processor that attempts to retain the
> > > knowledge of
> > > > > the
> > > > > > > "last timestamp" within NiFi itself. That's something that
> could be
> > > > > > > considered, but would need a code change (feel free to raise a
> jira
> > > > > ticket
> > > > > > > for the future [3] if you think that would be helpful). One of
> the
> > > > > reasons
> > > > > > > for this is that, unlike an S3 Bucket (for example), documents
> are
> > > not
> > > > > > > guaranteed to always be indexed within Elasticsearch in
> order/with
> > > > > such an
> > > > > > > "updated at" field, although one could design their system that
> > > way, of
> > > > > > > course.
> > > > > > >
> > > > > > > [1]
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > > > > >
> > > > > > > [2]
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
> > > > > > >
> > > > > > > [3] https://issues.apache.org/jira/browse/NIFI
> > > > > > >
> > > > > > > On 2023/08/17 12:43:31 Richard Beare wrote:
> > > > > > > > I must be missing something simple. I've copied the
> parameters
> > > and
> > > > > query
> > > > > > > > from the SearchElasticSearch processor and I'm not getting
> > > errors,
> > > > > but no
> > > > > > > > flowfiles are produced.
> > > > > > > >
> > > > > > > > I'm forced to add an input connection, despite coding the
> query
> > > in
> > > > > the
> > > > > > > > Query property. I have a GenerateFlowFile processor
> connected.
> > > I'm
> > > > > > > using.a
> > > > > > > > basic match all as a starting point
> > > > > > > > {
> > > > > > > > "query" :
> > > > > > > >     {
> > > > > > > >     "match_all" : {}
> > > > > > > >     }
> > > > > > > > }
> > > > > > > >
> > > > > > > > Sending the query via curl appears to work OK - I get a page
> of
> > > stuff
> > > > > > > back.
> > > > > > > > I'm using nifi 1.20.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson <
> [email protected]
> > > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Elasticsearch doesn't have a CDC-like capability (it
> doesn't
> > > > > maintain a
> > > > > > > > > transaction log or such), so that approach isn't possible.
> > > > > > > > >
> > > > > > > > > What I've done previously is to maintain an audit log in a
> > > separate
> > > > > > > index
> > > > > > > > > within elasticsearch to track what data I've previously
> posted,
> > > > > e.g.
> > > > > > > this
> > > > > > > > > might be the last "updated_date" value read from the data
> > > index in
> > > > > a
> > > > > > > > > previous run of the nifi processor. So your nifi Flow
> would be
> > > > > > > something
> > > > > > > > > like:
> > > > > > > > >
> > > > > > > > > Query for latest processed updated_date > paginated query
> for
> > > all
> > > > > new
> > > > > > > data
> > > > > > > > > > determine new latest updated_date (e.g. using
> QueryRecord) >
> > > put
> > > > > new
> > > > > > > > > latest updated_date into elasticsearch, ready for the next
> run
> > > > > > > > >
> > > > > > > > > On 2023/08/16 23:15:19 Richard Beare wrote:
> > > > > > > > > > One further question - what is the recommended way of
> > > checking
> > > > > for
> > > > > > > > > updates
> > > > > > > > > > in an index and fetching new records in a similar manner
> to
> > > > > > > > > > GenerateTableFetch for an sql DB?
> > > > > > > > > > Thanks
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare <
> > > > > > > [email protected]>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Sounds perfect. Thanks
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson <
> > > > > [email protected]>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> What you describe sounds like the processor is
> working as
> > > > > > > designed &
> > > > > > > > > > >> documented, i.e. it will restart the same query once
> it
> > > has
> > > > > > > reached
> > > > > > > > > the end
> > > > > > > > > > >> of the paginated scroll (or search_after, or
> > > point-in-time)
> > > > > query.
> > > > > > > > > > >>
> > > > > > > > > > >> Instead, it sounds like you want to try using the
> > > > > > > > > > >> PaginatedJsonQueryElasticsearch [1] processor instead.
> > > This
> > > > > will
> > > > > > > > > execute
> > > > > > > > > > >> the query given to it, either as the query property
> or the
> > > > > body
> > > > > > > of an
> > > > > > > > > > >> incoming FlowFile, output the results, and then stop.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> [1]
> > > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html
> > > > > > > > > > >>
> > > > > > > > > > >> On 2023/08/16 07:57:43 Richard Beare wrote:
> > > > > > > > > > >> > Hi,
> > > > > > > > > > >> > I am using the SearchElasticSearch (1.20.0)
> processor to
> > > > > > > retrieve
> > > > > > > > > all
> > > > > > > > > > >> > documents (~20M) from an index, process and
> eventually
> > > > > return
> > > > > > > > > results
> > > > > > > > > > >> to a
> > > > > > > > > > >> > new index, although for this test I'm retrieving and
> > > > > processing
> > > > > > > then
> > > > > > > > > > >> > discarding. I'm using opensearch.
> > > > > > > > > > >> >
> > > > > > > > > > >> > My problem is that the process restarts after
> > > completion - I
> > > > > > > > > discovered
> > > > > > > > > > >> > this, and docs confirm, after seeing warnings from
> my
> > > > > processing
> > > > > > > > > code
> > > > > > > > > > >> > (which reformats json ready for other work) being
> > > repeated
> > > > > for
> > > > > > > the
> > > > > > > > > same
> > > > > > > > > > >> > document ID.
> > > > > > > > > > >> >
> > > > > > > > > > >> > How do I configure the processor to stop after the
> > > > > completing
> > > > > > > the
> > > > > > > > > first
> > > > > > > > > > >> > query.
> > > > > > > > > > >> >
> > > > > > > > > > >> > I've tried the following:
> > > > > > > > > > >> >
> > > > > > > > > > >> > Query: {"query" : {"match_all" :{}}}
> > > > > > > > > > >> >
> > > > > > > > > > >> > with pagination_type SCROLL
> > > > > > > > > > >> >
> > > > > > > > > > >> > I haven't found a combination of the properties that
> > > doesn't
> > > > > > > lead to
> > > > > > > > > > >> > repeated cycles through the index.
> > > > > > > > > > >> >
> > > > > > > > > > >> > I've also tried {"query" : {"match_all" :{}},
> "sort" :
> > > > > > > > > > >> [{"Visit_DateTime" :
> > > > > > > > > > >> > "asc"]}}
> > > > > > > > > > >> >
> > > > > > > > > > >> > and SEARCH_AFTER pagination type, with the same
> problem.
> > > > > > > > > > >> >
> > > > > > > > > > >> > What am I missing?
> > > > > > > > > > >> > Thanks
> > > > > > > > > > >> >
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to