Abel, For 1 see the documentation at [1] for scheduling it to run once per day. For 2 I recommend using ExecuteSQLRecord instead of ExecuteSQL unless you need the intermediate output to be Avro. JSON or CSV is often faster. For 3 it depends on which database you're using, NiFi has a CaptureChangeMySQL processor for MySQL CDC. If you're using a different DB you'll need to consult the database's documentation on how to send change data somewhere, but NiFi has many ways to receive such data (HTTP/S, Kafka, e.g.) and I found a series of articles [2] that illustrate a CDC flow if that's what you're looking for. If there is no CDC solution for your DB, you can emulate DB replication to Elasticsearch by fetching the whole source DB once a day and using "Upsert" as the value of the Index Operation property in the PutElasticsearchRecord processor. Please note that deleted records (without a true CDC solution) won't show up in this proposed flow and thus won't be deleted at the target Elasticsearch.
Regards, Matt [1] https://nifi.apache.org/nifi-docs/user-guide.html#scheduling-tab [2] https://community.cloudera.com/t5/Community-Articles/Change-Data-Capture-CDC-with-Apache-NiFi-Part-1-of-3/ta-p/246623 On Mon, Jan 27, 2025 at 5:30 PM Abel Bellati <[email protected]> wrote: > Good evening, > > > > I am a beginner with Apache NiFi, and I have a question about automating > my data flow. > > > > Here is what I have done so far: > > • I created my first *“ExecuteSQL”* processor, which successfully > connects to the database. > > • The query executes without any issues, and I retrieve the data correctly. > > • The data is then sent directly to *“PutElasticsearchRecord”* processor > to be stored in Elasticsearch. > > > > When I check *“View Data Provenance”*, I see several files (Drop, Fork, > Send). > > > > *My question is the following:* > > I would like to: > > 1. *Schedule the “ExecuteSQL” processor to run once a day* (at a specific > interval). > > 2. *Generate a single file containing all the extracted data* and send > this file to the *“PutElasticsearchRecord”* processor. > > 3. Automatically detect and retrieve *modifications in the database* and > inject them into the respective processors. > > > > Thank you very much for your assistance and guidance in configuring this > flow. > > > > Best regards, > > -- > > Abel Bellati > > Université de Limoges > > DSI : SERVICE SUPPORT ET APPLICATION > DIRECTION GENERALE DES SERVICES > > 123 Avenue Albert THOMAS > 87060 LIMOGES CEDEX > > Tél. : 0587080801 > > Portable : 06 32 64 94 59 > Num. de poste : 3801 > Web : http://www.unilim.fr > > Mail : [email protected] >
