Hi Everyone, Still struggling to fix this issue and may need to try some different things.
What is the recommended way of transforming a record structure? At the moment I have a groovy script doing this but the downstream processing is very slow, as discussed in the preceding thread. The transformation is very simple - starting structure is: { "result" : { "text" : " document text", "metadata" : { "X-TIKA:Parsed-By": [ "org.apache.tika.parser.pdf.PDFParser" ], "X-OCR-Applied" : true, "dcterms:created": "2018;07-24T15:04:51Z", "Content-Type" : "application/pdf", "Page-Count" : 2, }, "success" : true, "timestamp" : "2022-12-20T09:02:27.902Z", "processingElapsedTime" : 6 } } final structure is': [ { "doc_id" : 58, "doc_text" : " ", "processing_timestamp" : "2022-12-20T09:02:27.902Z", "metadata_x_ocr_applies" : true, "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", "metadata_content_type" : "application/pdf", "metadata_page_count" : 1 "metadata_creation_date": null, "metadata_last_modified: nill }] So a kind of flattening of the structure. Is there a processor I should be using to do this instead of a groovy script? Thanks On Wed, Dec 14, 2022 at 7:57 AM Richard Beare <richard.be...@gmail.com> wrote: > Any thoughts on this? Are there some extra steps required when creating an > avro file from a user defined schema? > > On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <richard.be...@gmail.com> > wrote: > >> Here's another result that I think suggests there's something wrong with >> the avro files created by the groovy script, although I can't see what the >> problem might be. >> >> The test is as follows. Output of the groovy script creating avro files >> is passed to convertrecord, configured with an avro reader and json writer. >> This is slow. The json output is then converted back to avro with another >> convertrecord processor, configured with a jsontreereader and an avro >> writer - this is fast, instantly emptying the queue. The result of that is >> fed into the previously problematic merge processor which works exactly as >> expected, producing flowfiles with 100 records each. >> >> The difference I can see between the two flow files is the way in which >> the schema is specified. Perhaps some extras are required in the groovy >> file to set that up? >> >> The slow one has: >> >> {"type":"record","name":"document", "fields":[{ >> >> The fast one >> >> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields": >> >> >> Initial characters are also slightly different. >> Slow one: >> >> 0000000 O b j 001 002 026 a v r o . s c h e m >> 0000020 a 346 \n { " t y p e " : " r e c o >> >> Fast one >> >> 0000000 O b j 001 004 026 a v r o . s c h e m >> 0000020 a 362 \b { " t y p e " : " r e c o >> >> >> The groovy script is >> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master · >> CogStack/CogStack-NiFi · GitHub >> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-tika-result-json-to-avro.groovy> >> >> The schema is >> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub >> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-schemas/document.avsc> >> >> >> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <richard.be...@gmail.com> >> wrote: >> >>> I'm diving into the convertrecord tests a bit deeper on the production >>> server. >>> >>> The first test case - 259 documents, total of 1M when in avro format in >>> the input queue to the convert record processor. These avro files were not >>> created by the groovy script - they start life as a database query and the >>> text field is in one of the columns. The convertrecord processor runs very >>> quickly - click start, press refresh and it is done. The avro ends up like >>> this: >>> >>> [ { >>> "sampleid" : 1075, >>> "typeid" : 98, >>> "dct" : "2020-01-25T21:40:25.515Z", >>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>> "document" : "Text removed", >>> "docid" : "9" >>> } ] >>> >>> In the second test, where the text fields are extracted from pdf tika >>> before avro files are created by the groovy script (from the tika json >>> output), the total queue size for the 259 documents is larger - 1.77MB, and >>> the performance is very different - press start, click refresh and only two >>> flowfiles are processed. >>> >>> [ { >>> "doc_id" : "70", >>> "doc_text" : "text removed", >>> "processing_timestamp" : "2022-12-07T23:09:52.354Z", >>> "metadata_x_ocr_applied" : true, >>> "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser", >>> "metadata_content_type" : "application/pdf", >>> "metadata_page_count" : 1, >>> "metadata_creation_date" : null, >>> "metadata_last_modified" : null >>> } ] >>> >>> I've noticed that the second one has a content.type attribute of >>> 'application/json' which doesn't seem right and doesn't match the fast >>> case. I'll see what happens if I change that. >>> >>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <richard.be...@gmail.com> >>> wrote: >>> >>>> Hi All, >>>> Some progress on debugging options. I've found a flow that exhibits the >>>> problem using synthetic data. However the results are host dependent. On my >>>> laptop a "run-once" click of merge record gives me two flowfiles of 100 >>>> records, while the same flow on the production server produces several much >>>> smaller flowfiles. This makes me think that something funny is happening >>>> with my storage setup that I'll need to chase. >>>> >>>> I had tested the convertrecord option (simply avroreader->avrowriter) >>>> and it did seem slow, but I'll investigate this further as it may be >>>> related to my storage issue. >>>> >>>> >>>> >>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <marka...@hotmail.com> wrote: >>>> >>>>> > Is there something about this structure that is likely to be causing >>>>> the problem? Could there be other issues with the avro generated by the >>>>> script? >>>>> >>>>> I don’t think the structure should matter. And as long as the avro >>>>> produced is proper Avro, I don’t think it should matter. Unless perhaps >>>>> there’s some issue with the Avro library itself that’s causing it to take >>>>> a >>>>> really long time to parse the Avro or something? I’d be curious - if you >>>>> take the output of your script and then you run it through a ConvertRecord >>>>> (Avro Reader -> Json Writer) is the ConvertRecord fast? Or is it really >>>>> slow to process it? >>>>> >>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare <richard.be...@gmail.com> >>>>> wrote: >>>>> >>>>> Further - I performed another test in which I replaced the custom json >>>>> to avro script with a ConvertRecord processor - merge record appears to >>>>> work as expected in that case. >>>>> >>>>> Output of convertrecord looks like this: >>>>> >>>>> [ { >>>>> "text" : " No Alert Found \n\n", >>>>> "metadata" : { >>>>> "X_TIKA_Parsed_By" : null, >>>>> "X_OCR_Applied" : null, >>>>> "Content_Type" : null >>>>> }, >>>>> "success" : true, >>>>> "timestamp" : "2022-12-05T10:49:18.568Z", >>>>> "processingElapsedTime" : 0, >>>>> "doc_id" : "5.60178607E8" >>>>> } ] >>>>> >>>>> while the output of the script looks like: >>>>> >>>>> [ { >>>>> "doc_id" : "5.61996505E8", >>>>> "doc_text" : " No Alert Found \n\n", >>>>> "processing_timestamp" : "2022-11-28T01:16:46.775Z", >>>>> "metadata_x_ocr_applied" : true, >>>>> "metadata_x_parsed_by" : >>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>> "metadata_content_type" : "application/rtf", >>>>> "metadata_page_count" : null, >>>>> "metadata_creation_date" : null, >>>>> "metadata_last_modified" : null >>>>> } ] >>>>> >>>>> Is there something about this structure that is likely to be causing >>>>> the problem? Could there be other issues with the avro generated by the >>>>> script? >>>>> >>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <richard.be...@gmail.com> >>>>> wrote: >>>>> >>>>>> I've reset the backpressure to the default >>>>>> >>>>>> This remains something of a mystery. The merge with synthetic data >>>>>> happily creates flowfiles with 100 records, and the join says "Records >>>>>> merged due to: Bin is full" or "Records merged due to: Bin is full >>>>>> enough". >>>>>> No timeouts in that case, even with the max bin age at 4.5 seconds. The >>>>>> resulting flowfiles were about 300K. >>>>>> >>>>>> The real data is doing much the same as before, producing flowfiles >>>>>> of about 30K, with 7 records or so. If I increase the maximum bin age to >>>>>> 30 >>>>>> seconds the size and record count is higher - 12 to 15. Nothing like the >>>>>> behaviour with synthetic data, where the 100 record flowfiles are created >>>>>> almost instantly. Joins are always due to bin age. >>>>>> >>>>>> Can the problem relate to the structure of the avro files? Any way to >>>>>> dive into that? Everything else about the mergerecord settings appear the >>>>>> same, so I can't see an explanation as to why the behaviour is different >>>>>> on >>>>>> the same hardware. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <marka...@hotmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hey Richard, >>>>>>> >>>>>>> So a few things that I’ve done/looked at. >>>>>>> >>>>>>> I generated some Avro data (random JSON that I downloaded from a >>>>>>> Random JSON Generator and then converted to Avro). >>>>>>> >>>>>>> I then ran this avro data into both the MergeRecord processors. >>>>>>> >>>>>>> Firstly, I noticed that both are very slow. Found that was because >>>>>>> Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for >>>>>>> MergeRecord. And really for basically all processors except for the >>>>>>> first >>>>>>> one in the flow. >>>>>>> >>>>>>> I also notice that you have backpressure set on your connections to >>>>>>> 40,000 FlowFiles and 4 GB. This can significantly slow things down. If >>>>>>> you >>>>>>> have performance concerns you definitely want backpressure set back to >>>>>>> the >>>>>>> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start >>>>>>> “swapping out” FlowFiles to disk, and this can significantly slow things >>>>>>> down. >>>>>>> >>>>>>> I noticed that MergeRecord is set to 1 concurrent task. Probably >>>>>>> worth considering increasing that, if performance is a concern. >>>>>>> >>>>>>> That said, I am seeing nice, full bins of 100 records merged from >>>>>>> each of the MergeRecord processors. >>>>>>> So it is certainly possible that if you’re seeing smaller bins it’s >>>>>>> becuase you’re timing out. The 4.5 seconds timeout is quite short. Have >>>>>>> you >>>>>>> tried increasing that to say 30 seconds to see if it gives you larger >>>>>>> bins? >>>>>>> I also recommend that you take a look at the data provenance to see >>>>>>> why it’s creating the bins. >>>>>>> >>>>>>> If unclear how to do that: >>>>>>> Right-click on the MergeRecord processor >>>>>>> Go to View data provenance >>>>>>> Scroll down the list until you see a “JOIN” event type. You can >>>>>>> ignore the ATTRIBUTES_MODIFIED and DROP events for now. >>>>>>> Click the ‘i’ icon on the left-hand side. >>>>>>> This will show you details about the merge. In the Details tab, if >>>>>>> you scroll down, it will show you a Details field, which tells you why >>>>>>> the >>>>>>> data was merged. It should either say: "Records Merged due to: Bin has >>>>>>> reached Max Bin Age” or “ Records Merged due to: Bin is full” >>>>>>> >>>>>>> If it is due to Max Bin Age reached, then I’d recommend increasing >>>>>>> number of concurrent tasks, reducing backpressure to no more than 10,000 >>>>>>> FlowFiles in the queue, and/or increasing the Max Bin Age. >>>>>>> Also worth asking - what kind of machines is this running on? A 64 >>>>>>> core VM with 1 TB volume will, of course, run MUCH differently than a 4 >>>>>>> core VM with a 10 GB volume, especially in the cloud. >>>>>>> >>>>>>> If still having trouble, let me know what the provenance tells you >>>>>>> about the reason for merging the data, and we can go from there. >>>>>>> >>>>>>> Thanks! >>>>>>> -Mark >>>>>>> >>>>>>> >>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <marka...@hotmail.com> wrote: >>>>>>> >>>>>>> Richard, >>>>>>> >>>>>>> I think just the flow structure shoudl be sufficient. >>>>>>> >>>>>>> Thanks >>>>>>> -Mark >>>>>>> >>>>>>> >>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <richard.be...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>> Thanks for responding, >>>>>>> I re-tested with max bins = 2, but the behaviour remained the same. >>>>>>> I can easily share a version of the functioning workflow (and data), >>>>>>> which >>>>>>> is part of a public project. The problem workflow (which shares many of >>>>>>> the >>>>>>> same components) is part of a health research project, so more >>>>>>> difficult. I >>>>>>> definitely can't share any data from that one. Do you need to see the >>>>>>> data >>>>>>> or is the overall structure sufficient at this point? Happy to >>>>>>> demonstrate >>>>>>> via video conference too. >>>>>>> >>>>>>> Thanks >>>>>>> >>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <marka...@hotmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Richard, >>>>>>>> >>>>>>>> Can you try increasing the Maximum Number of Bins? I think there >>>>>>>> was an issue that was recently addressed in which the merge processors >>>>>>>> had >>>>>>>> an issue when Max Number of Bins = 1. >>>>>>>> >>>>>>>> If you still see the same issue, please provide a copy of the flow >>>>>>>> that can be used to replicate the issue. >>>>>>>> >>>>>>>> Thanks >>>>>>>> -Mark >>>>>>>> >>>>>>>> >>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <richard.be...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Pretty much the same - I seem to end up with flowfiles containing >>>>>>>> about 7 records, presumably always triggered by the timeout. >>>>>>>> >>>>>>>> I had thought the timeout needed to be less than the run schedule, >>>>>>>> but it looks like it can be the same. >>>>>>>> >>>>>>>> Here's a debug dump >>>>>>>> >>>>>>>> 10:13:43 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021] >>>>>>>> >>>>>>>> 10:13:43 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021] >>>>>>>> >>>>>>>> 10:13:44 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>>>>> >>>>>>>> 10:13:44 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021] >>>>>>>> >>>>>>>> 10:13:44 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021] >>>>>>>> >>>>>>>> 10:13:45 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021] >>>>>>>> >>>>>>>> 10:13:46 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>>>>>> >>>>>>>> 10:13:46 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>> 10:13:46 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created >>>>>>>> OutputStream using session StandardProcessSession[id=83204] for >>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>> 10:13:46 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>> 10:13:48 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>> 10:13:48 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>> 10:13:49 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>>>>> >>>>>>>> 10:13:49 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>> 10:13:49 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022] >>>>>>>> >>>>>>>> 10:13:52 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022] >>>>>>>> >>>>>>>> 10:13:53 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>>>>> >>>>>>>> 10:13:53 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>> 10:13:53 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created >>>>>>>> OutputStream using session StandardProcessSession[id=83205] for >>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>> 10:13:53 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>> 10:13:54 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>> 10:13:54 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>> 10:13:55 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>>>>> >>>>>>>> 10:13:55 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>> 10:13:55 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023] >>>>>>>> >>>>>>>> 10:13:58 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4023] is now >>>>>>>> expired. >>>>>>>> Completing bin. >>>>>>>> >>>>>>>> 10:13:58 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete >>>>>>>> because >>>>>>>> complete() was called >>>>>>>> >>>>>>>> 10:13:58 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record >>>>>>>> Writer using session StandardProcessSession[id=83205] for >>>>>>>> RecordBin[size=6, >>>>>>>> full=false, isComplete=true, id=4023] >>>>>>>> >>>>>>>> 10:13:58 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin >>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records >>>>>>>> with >>>>>>>> Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] >>>>>>>> using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, >>>>>>>> id=1068800, id=1068845] >>>>>>>> >>>>>>>> 10:13:58 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023] >>>>>>>> >>>>>>>> 10:14:01 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024] >>>>>>>> >>>>>>>> 10:14:01 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024] >>>>>>>> >>>>>>>> 10:14:02 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>>>>>> >>>>>>>> 10:14:02 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024] >>>>>>>> >>>>>>>> 10:14:02 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024] >>>>>>>> >>>>>>>> 10:14:05 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] >>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4024] is now >>>>>>>> expired. >>>>>>>> Completing bin. >>>>>>>> >>>>>>>> 10:14:05 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete >>>>>>>> because >>>>>>>> complete() was called >>>>>>>> >>>>>>>> 10:14:05 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record >>>>>>>> Writer using session StandardProcessSession[id=83206] for >>>>>>>> RecordBin[size=6, >>>>>>>> full=false, isComplete=true, id=4024] >>>>>>>> >>>>>>>> 10:14:05 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin >>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records >>>>>>>> with >>>>>>>> Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] >>>>>>>> using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, >>>>>>>> id=1069451, id=1069492] >>>>>>>> >>>>>>>> 10:14:05 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024] >>>>>>>> >>>>>>>> 10:14:07 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025] >>>>>>>> >>>>>>>> 10:14:07 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025] >>>>>>>> >>>>>>>> 10:14:08 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>>>>>> >>>>>>>> 10:14:08 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>>>>>> id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025] >>>>>>>> >>>>>>>> 10:14:08 UTC >>>>>>>> DEBUG >>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>>>>>> >>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>>>>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025] >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <joe.w...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hello >>>>>>>>> >>>>>>>>> Run schedule should be 0. >>>>>>>>> >>>>>>>>> 50 should be the min number of records >>>>>>>>> >>>>>>>>> 5 seconds is the max bin age it sounds like you want. >>>>>>>>> >>>>>>>>> Start with these changes and let us know what youre seeing. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare < >>>>>>>>> richard.be...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> I'm having a great deal of trouble configuring the mergerecord >>>>>>>>>> processor to deliver reasonable performance and I'm not sure where >>>>>>>>>> to look >>>>>>>>>> to correct it. One of my upstream processors requires a single >>>>>>>>>> record per >>>>>>>>>> flowfile, but I'd like to create larger flowfiles before passing to >>>>>>>>>> the >>>>>>>>>> next stage. The flowfiles are independent at this stage so there's no >>>>>>>>>> special processing required of the merging. I'd like to create >>>>>>>>>> flowfiles of >>>>>>>>>> about 50 to 100 records. >>>>>>>>>> >>>>>>>>>> I have two tests, both running on the same nifi system. One uses >>>>>>>>>> synthetic data, the other the production data. The performance of the >>>>>>>>>> mergerecord processor for the synthetic data is as I'd expect, and I >>>>>>>>>> can't >>>>>>>>>> figure out why the production data is so much slower. Here's the >>>>>>>>>> configuration: >>>>>>>>>> >>>>>>>>>> mergerecord has the following settings. Timer driven, 1 >>>>>>>>>> concurrent task, 5 second run schedule, bin packing merge strategy, >>>>>>>>>> min >>>>>>>>>> records = 1, max records = 100, max bin age = 4.5 secs, maximum >>>>>>>>>> number of >>>>>>>>>> bins = 1. >>>>>>>>>> >>>>>>>>>> In the case of synthetic data the typical flowfile size is in the >>>>>>>>>> range 2 to 7KB. >>>>>>>>>> >>>>>>>>>> The size of flowfiles for the production case is smaller - >>>>>>>>>> typically around 1KB. >>>>>>>>>> >>>>>>>>>> The structure in the tests is slightly different. Synthetic is >>>>>>>>>> (note that I've removed the text part): >>>>>>>>>> >>>>>>>>>> [ { >>>>>>>>>> "sampleid" : 1075, >>>>>>>>>> "typeid" : 98, >>>>>>>>>> "dct" : "2020-01-25T21:40:25.515Z", >>>>>>>>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>>>>>>>>> "document" : "Text removed - typically a few hundred words", >>>>>>>>>> "docid" : "9" >>>>>>>>>> } ] >>>>>>>>>> >>>>>>>>>> Production is: >>>>>>>>>> [ { >>>>>>>>>> "doc_id" : "5.60622895E8", >>>>>>>>>> "doc_text" : " Text deleted - typically a few hundred words", >>>>>>>>>> "processing_timestamp" : "2022-11-27T23:56:35.601Z", >>>>>>>>>> "metadata_x_ocr_applied" : true, >>>>>>>>>> "metadata_x_parsed_by" : >>>>>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>>>>>>> "metadata_content_type" : "application/rtf", >>>>>>>>>> "metadata_page_count" : null, >>>>>>>>>> "metadata_creation_date" : null, >>>>>>>>>> "metadata_last_modified" : null >>>>>>>>>> } ] >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I load up the queue feeding the mergerecord processor with >>>>>>>>>> several hundred individual flowfiles and activate it. >>>>>>>>>> >>>>>>>>>> The synthetic data is nicely placed into chunks of 100, with any >>>>>>>>>> remainder being flushed in a smaller chunk. >>>>>>>>>> >>>>>>>>>> The production data is generally bundled into groups of 6 >>>>>>>>>> records, sometimes less. Certainly it never gets close to 100 >>>>>>>>>> records. >>>>>>>>>> >>>>>>>>>> Any ideas as to what I should look at to track down the >>>>>>>>>> difference? >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>