Good points - I've done some testing. About 1-2 minutes for 1 month's data with 1k page sizes and about half that for 10k. About 8-10 minutes for 1 years worth of data at 10k pages.
Per month looks like the sweet spot in terms of size - that's about 500-750MB. In terms of building the upstream tools to generate the queries, is the paginatedjsonquery the way to go to retrieve the oldest and most recent date from an index? ~ On Sun, Aug 20, 2023 at 1:53 AM Chris Sampson <[email protected]> wrote: > I'd guess it depends on what you want to achieve downstream, e.g. would > setting the query processor to output per_query and return everything in 1 > to be useful? Internally, the processor is so fetching everything in pages > from Elasticsearch, setting the size higher will reduce the number of > network round-trips, but note that nifi will hold the entire response from > Elasticsearch in memory until it is written to a flowfile - this is fine > before the next loop within the processor, even if the prices session isn't > committed and you don't see the output for a while. > > You've a choice to make between number of network calls (page fetches), > number of queries (which kind of amounts to the same thing really), page > size in memory (will impact both nifi and elasticsearch, as well as network > performance), and number of flowfiles you want to deal with downstream - > having all your data in a single flowfile might be useful, if you can use > Record-based processors for everything you want to do later - the fewer > flowfiles you have, the more performance your flow is likely to be (general > oversimplification). > > How long did it take for you to fetch a day of data using 1k page sizes? > Did it work if you up page size to 10k? How about 10k page for a month or a > whole year? > > If you decide to break up the query by time range, e.g. years or months, > then a python or groovy script is certainly an option in order to generate > the parameters (e.g. attributes on a flowfile) to feed into the query. > > On 2023/08/19 05:05:39 Richard Beare wrote: > > A bit of progress. > > First up, firing a match_all at my index with 20M documents doesn't work, > > as you probably expected. Or more precisely, is unlikely to be useful - I > > left it overnight and nothing appeared to have happened, so I guess it > was > > madly fetching pages and filling up available storage. > > > > So I tested with a query of the form > > { > > query": { > > "range" : { > > "Visit_DateTime": { > > "gte" : "01/07/2020", > > "lte" : "02/07/2020", > > "format" : "dd/MM/yyyy" > > } > > } > > } > > } > > > > i.e a single days worth of documents (38998 according to a curl _count > > version of the query). This did indeed produce 3900 flowfiles in the hits > > queue and consume the input. > > > > Including a size parameter as follows: > > > > { > > "size" : 1000, > > query": { > > "range" : { > > "Visit_DateTime": { > > "gte" : "01/07/2020", > > "lte" : "02/07/2020", > > "format" : "dd/MM/yyyy" > > } > > } > > } > > } > > > > Leads to 39 flowfiles in the hits queue. > > > > So it looks like my best way forward processing many years worth of data > is > > to generate a set of day-based queries. Is a python script the best > option? > > > > > > > > > > On Fri, Aug 18, 2023 at 4:03 PM Chris Sampson <[email protected]> wrote: > > > > > Ah, so these processors have all been written for Elasticsearch, and > use > > > the Elasticsearch low-level REST API library to form connections. > They've > > > not been tested against OpenSearch, although hopefully should work for > any > > > interactions where the API is the same, but the two products continue > to > > > diverge, so there's increasing chance that some things won't work. > > > > > > Any details of things that aren't working would be good to know about > > > (e.g. raised as Jira tickets, containing a much detail as possible, > like > > > the query used and any log details of errors), so that the community > could > > > look into providing OpenSearch compatibility in the future. > > > > > > I've known a few people try with OpenSearch and things either work, or > we > > > don't hear about the errors that are received, so we don't know what > needs > > > looking at from a NiFi perspective. > > > > > > On 2023/08/18 04:37:10 Richard Beare wrote: > > > > I did use the example and got errors. I'll revisit that (perhaps it > is an > > > > opensearch idiosyncrasy). The per response option is probably my > issue. > > > > I'll check that out and get back to you. > > > > Thanks again > > > > > > > > On Fri, Aug 18, 2023 at 2:30 PM Chris Sampson <[email protected]> > wrote: > > > > > > > > > Check the example in the processor's additional details docs [1] > for > > > how > > > > > you could set size and sort fields for the query - size is used to > > > > > determine the number of documents returned per page, sorry is > required > > > if > > > > > using a "search after" or "point in time" query type. > > > > > > > > > > If the Query property is set, the incoming FlowFile content should > be > > > > > ignored, i.e. it doesn't need to be empty. > > > > > > > > > > Use the "Search Results Split" property to determine how the > results > > > are > > > > > output. This defaults to "per response", which outputs a flowfile > for > > > every > > > > > page of results. As PaginatedJsonQueryElasticsearch takes an input > > > > > flowfile, its internal "process session" remains active until the > > > processor > > > > > completes and commits is session - this happens when there are no > more > > > > > results to retrieve from Elasticsearch, at which point the input > > > flowfile > > > > > disappears from the input queue and all output flowfiles appear in > the > > > > > output queues. This is how the nifi framework handles sessions, but > > > can be > > > > > confusing if you're not aware of that beforehand. > > > > > > > > > > SearchElasticsearch is different in this regard because its session > > > ends > > > > > after every iteration (determined by the "Search Results Split", > e.g. > > > this > > > > > could be per page or per entire query), then uses nifi state to > setup > > > the > > > > > next iteration. This means you could start to see output flowfiles > > > sooner. > > > > > > > > > > > > > > > [1] > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/additionalDetails.html > > > > > > > > > > On 2023/08/17 22:13:22 Richard Beare wrote: > > > > > > Thanks, that makes sense. I've had trouble getting a size > parameter > > > > > > accepted, but will work on that later. > > > > > > > > > > > > However, I'm unsure what I should expect to see in the following > test > > > > > > scenario. > > > > > > > > > > > > A fixed query in the Query parameter - a match all. i.e. nothing > > > dynamic > > > > > > set by upstream processing > > > > > > > > > > > > An empty input flowfile to trigger activity. > > > > > > > > > > > > The test index is large. (20M docs) > > > > > > > > > > > > Do I expect the processor to begin filling the output queue as > fast > > > as it > > > > > > can, with one flowfile per received page, pausing as the queue > fills? > > > > > > That was what I was anticipating, but at the moment I'm getting > no > > > output > > > > > > and the input flowfile isn't being consumed. I suspect one flag > is > > > wrong, > > > > > > but can't see it. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Aug 18, 2023 at 12:06 AM Chris Sampson < > [email protected]> > > > > > wrote: > > > > > > > > > > > > > Again, sounds like it's working as documented [1] - an input is > > > > > required > > > > > > > to trigger the PaginatedJsonQueryElasticsearch processor, so > > > something > > > > > like > > > > > > > GenerateFlowFile is a way to achieve that if you want to > > > periodically > > > > > > > execute a paginated query, e.g. by setting the Generate > processor's > > > > > > > schedule to run every hour, or use cron syntax, etc. The > advantage > > > with > > > > > > > this processor is that you can use the output of another > processor > > > > > (e.g. > > > > > > > build a query using the results of another processor, such as > an > > > > > initial > > > > > > > query of Elasticsearch) to trigger the paginated query of > > > > > Elasticsearch, > > > > > > > but once the query is finished, the processor won't keep > firing. > > > > > > > > > > > > > > Conversely, SearchElasticsearch does not allow incoming > > > connections, > > > > > but > > > > > > > only triggers the same query on the defined schedule. If the > query > > > > > needs to > > > > > > > use parameters (or some sort of variable), you need to figure > out > > > how > > > > > to > > > > > > > apply that in the Query parameter of the processor - it could > be by > > > > > > > Elasticsearch notation (e.g. "now/d" for the start of the > current > > > day > > > > > in a > > > > > > > date range filter), or something that can be achieved using > NiFi > > > > > Expression > > > > > > > Language [2], but without the flexibility of providing inputs > in > > > > > FlowFile > > > > > > > content, which could be the output of a previous query, or > > > > > > > GenerateFlowFile, etc. > > > > > > > > > > > > > > You need to figure out what query you want to run, what > input(s) > > > are > > > > > > > appropriate, and the schedule to which you want to execute. > > > > > > > > > > > > > > The Search processor is aimed more at a use case of "I want to > > > > > continually > > > > > > > retrieve the contents of an Elasticsearch index/query as it is > > > > > populated > > > > > > > from an extremal source", PaginatedQuery is more for "I want to > > > > > retrieve > > > > > > > data from Elasticsearch that match a query"; both processors > are > > > meant > > > > > to > > > > > > > "allow for the possibility of many documents to be retrieved". > > > > > > > > > > > > > > For various reasons, neither processor was designed to hold > state > > > > > between > > > > > > > initiation of paginated queries, e.g. they don't follow the > > > pattern of > > > > > a > > > > > > > "Consume" or "List" processor that attempts to retain the > > > knowledge of > > > > > the > > > > > > > "last timestamp" within NiFi itself. That's something that > could be > > > > > > > considered, but would need a code change (feel free to raise a > jira > > > > > ticket > > > > > > > for the future [3] if you think that would be helpful). One of > the > > > > > reasons > > > > > > > for this is that, unlike an S3 Bucket (for example), documents > are > > > not > > > > > > > guaranteed to always be indexed within Elasticsearch in > order/with > > > > > such an > > > > > > > "updated at" field, although one could design their system that > > > way, of > > > > > > > course. > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > > > > > > > > > > > > > [2] > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html > > > > > > > > > > > > > > [3] https://issues.apache.org/jira/browse/NIFI > > > > > > > > > > > > > > On 2023/08/17 12:43:31 Richard Beare wrote: > > > > > > > > I must be missing something simple. I've copied the > parameters > > > and > > > > > query > > > > > > > > from the SearchElasticSearch processor and I'm not getting > > > errors, > > > > > but no > > > > > > > > flowfiles are produced. > > > > > > > > > > > > > > > > I'm forced to add an input connection, despite coding the > query > > > in > > > > > the > > > > > > > > Query property. I have a GenerateFlowFile processor > connected. > > > I'm > > > > > > > using.a > > > > > > > > basic match all as a starting point > > > > > > > > { > > > > > > > > "query" : > > > > > > > > { > > > > > > > > "match_all" : {} > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > Sending the query via curl appears to work OK - I get a page > of > > > stuff > > > > > > > back. > > > > > > > > I'm using nifi 1.20. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 2:24 PM Chris Sampson < > [email protected] > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Elasticsearch doesn't have a CDC-like capability (it > doesn't > > > > > maintain a > > > > > > > > > transaction log or such), so that approach isn't possible. > > > > > > > > > > > > > > > > > > What I've done previously is to maintain an audit log in a > > > separate > > > > > > > index > > > > > > > > > within elasticsearch to track what data I've previously > posted, > > > > > e.g. > > > > > > > this > > > > > > > > > might be the last "updated_date" value read from the data > > > index in > > > > > a > > > > > > > > > previous run of the nifi processor. So your nifi Flow > would be > > > > > > > something > > > > > > > > > like: > > > > > > > > > > > > > > > > > > Query for latest processed updated_date > paginated query > for > > > all > > > > > new > > > > > > > data > > > > > > > > > > determine new latest updated_date (e.g. using > QueryRecord) > > > > put > > > > > new > > > > > > > > > latest updated_date into elasticsearch, ready for the next > run > > > > > > > > > > > > > > > > > > On 2023/08/16 23:15:19 Richard Beare wrote: > > > > > > > > > > One further question - what is the recommended way of > > > checking > > > > > for > > > > > > > > > updates > > > > > > > > > > in an index and fetching new records in a similar manner > to > > > > > > > > > > GenerateTableFetch for an sql DB? > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 7:21 AM Richard Beare < > > > > > > > [email protected]> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Sounds perfect. Thanks > > > > > > > > > > > > > > > > > > > > > > On Thu, Aug 17, 2023 at 5:11 AM Chris Sampson < > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> What you describe sounds like the processor is > working as > > > > > > > designed & > > > > > > > > > > >> documented, i.e. it will restart the same query once > it > > > has > > > > > > > reached > > > > > > > > > the end > > > > > > > > > > >> of the paginated scroll (or search_after, or > > > point-in-time) > > > > > query. > > > > > > > > > > >> > > > > > > > > > > >> Instead, it sounds like you want to try using the > > > > > > > > > > >> PaginatedJsonQueryElasticsearch [1] processor instead. > > > This > > > > > will > > > > > > > > > execute > > > > > > > > > > >> the query given to it, either as the query property > or the > > > > > body > > > > > > > of an > > > > > > > > > > >> incoming FlowFile, output the results, and then stop. > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> [1] > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-elasticsearch-restapi-nar/1.23.0/org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch/index.html > > > > > > > > > > >> > > > > > > > > > > >> On 2023/08/16 07:57:43 Richard Beare wrote: > > > > > > > > > > >> > Hi, > > > > > > > > > > >> > I am using the SearchElasticSearch (1.20.0) > processor to > > > > > > > retrieve > > > > > > > > > all > > > > > > > > > > >> > documents (~20M) from an index, process and > eventually > > > > > return > > > > > > > > > results > > > > > > > > > > >> to a > > > > > > > > > > >> > new index, although for this test I'm retrieving and > > > > > processing > > > > > > > then > > > > > > > > > > >> > discarding. I'm using opensearch. > > > > > > > > > > >> > > > > > > > > > > > >> > My problem is that the process restarts after > > > completion - I > > > > > > > > > discovered > > > > > > > > > > >> > this, and docs confirm, after seeing warnings from > my > > > > > processing > > > > > > > > > code > > > > > > > > > > >> > (which reformats json ready for other work) being > > > repeated > > > > > for > > > > > > > the > > > > > > > > > same > > > > > > > > > > >> > document ID. > > > > > > > > > > >> > > > > > > > > > > > >> > How do I configure the processor to stop after the > > > > > completing > > > > > > > the > > > > > > > > > first > > > > > > > > > > >> > query. > > > > > > > > > > >> > > > > > > > > > > > >> > I've tried the following: > > > > > > > > > > >> > > > > > > > > > > > >> > Query: {"query" : {"match_all" :{}}} > > > > > > > > > > >> > > > > > > > > > > > >> > with pagination_type SCROLL > > > > > > > > > > >> > > > > > > > > > > > >> > I haven't found a combination of the properties that > > > doesn't > > > > > > > lead to > > > > > > > > > > >> > repeated cycles through the index. > > > > > > > > > > >> > > > > > > > > > > > >> > I've also tried {"query" : {"match_all" :{}}, > "sort" : > > > > > > > > > > >> [{"Visit_DateTime" : > > > > > > > > > > >> > "asc"]}} > > > > > > > > > > >> > > > > > > > > > > > >> > and SEARCH_AFTER pagination type, with the same > problem. > > > > > > > > > > >> > > > > > > > > > > > >> > What am I missing? > > > > > > > > > > >> > Thanks > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
