The script generating the avro files is: https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-tika-result-json-to-avro.groovy
On Mon, Dec 5, 2022 at 9:58 PM Richard Beare <richard.be...@gmail.com> wrote: > Further - I performed another test in which I replaced the custom json to > avro script with a ConvertRecord processor - merge record appears to work > as expected in that case. > > Output of convertrecord looks like this: > > [ { > "text" : " No Alert Found \n\n", > "metadata" : { > "X_TIKA_Parsed_By" : null, > "X_OCR_Applied" : null, > "Content_Type" : null > }, > "success" : true, > "timestamp" : "2022-12-05T10:49:18.568Z", > "processingElapsedTime" : 0, > "doc_id" : "5.60178607E8" > } ] > > while the output of the script looks like: > > [ { > "doc_id" : "5.61996505E8", > "doc_text" : " No Alert Found \n\n", > "processing_timestamp" : "2022-11-28T01:16:46.775Z", > "metadata_x_ocr_applied" : true, > "metadata_x_parsed_by" : > "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", > "metadata_content_type" : "application/rtf", > "metadata_page_count" : null, > "metadata_creation_date" : null, > "metadata_last_modified" : null > } ] > > Is there something about this structure that is likely to be causing the > problem? Could there be other issues with the avro generated by the script? > > On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <richard.be...@gmail.com> > wrote: > >> I've reset the backpressure to the default >> >> This remains something of a mystery. The merge with synthetic data >> happily creates flowfiles with 100 records, and the join says "Records >> merged due to: Bin is full" or "Records merged due to: Bin is full enough". >> No timeouts in that case, even with the max bin age at 4.5 seconds. The >> resulting flowfiles were about 300K. >> >> The real data is doing much the same as before, producing flowfiles of >> about 30K, with 7 records or so. If I increase the maximum bin age to 30 >> seconds the size and record count is higher - 12 to 15. Nothing like the >> behaviour with synthetic data, where the 100 record flowfiles are created >> almost instantly. Joins are always due to bin age. >> >> Can the problem relate to the structure of the avro files? Any way to >> dive into that? Everything else about the mergerecord settings appear the >> same, so I can't see an explanation as to why the behaviour is different on >> the same hardware. >> >> >> >> >> >> >> >> >> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <marka...@hotmail.com> wrote: >> >>> Hey Richard, >>> >>> So a few things that I’ve done/looked at. >>> >>> I generated some Avro data (random JSON that I downloaded from a Random >>> JSON Generator and then converted to Avro). >>> >>> I then ran this avro data into both the MergeRecord processors. >>> >>> Firstly, I noticed that both are very slow. Found that was because Run >>> Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for >>> MergeRecord. And really for basically all processors except for the first >>> one in the flow. >>> >>> I also notice that you have backpressure set on your connections to >>> 40,000 FlowFiles and 4 GB. This can significantly slow things down. If you >>> have performance concerns you definitely want backpressure set back to the >>> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start >>> “swapping out” FlowFiles to disk, and this can significantly slow things >>> down. >>> >>> I noticed that MergeRecord is set to 1 concurrent task. Probably worth >>> considering increasing that, if performance is a concern. >>> >>> That said, I am seeing nice, full bins of 100 records merged from each >>> of the MergeRecord processors. >>> So it is certainly possible that if you’re seeing smaller bins it’s >>> becuase you’re timing out. The 4.5 seconds timeout is quite short. Have you >>> tried increasing that to say 30 seconds to see if it gives you larger bins? >>> I also recommend that you take a look at the data provenance to see why >>> it’s creating the bins. >>> >>> If unclear how to do that: >>> Right-click on the MergeRecord processor >>> Go to View data provenance >>> Scroll down the list until you see a “JOIN” event type. You can ignore >>> the ATTRIBUTES_MODIFIED and DROP events for now. >>> Click the ‘i’ icon on the left-hand side. >>> This will show you details about the merge. In the Details tab, if you >>> scroll down, it will show you a Details field, which tells you why the data >>> was merged. It should either say: "Records Merged due to: Bin has reached >>> Max Bin Age” or “ Records Merged due to: Bin is full” >>> >>> If it is due to Max Bin Age reached, then I’d recommend increasing >>> number of concurrent tasks, reducing backpressure to no more than 10,000 >>> FlowFiles in the queue, and/or increasing the Max Bin Age. >>> Also worth asking - what kind of machines is this running on? A 64 core >>> VM with 1 TB volume will, of course, run MUCH differently than a 4 core VM >>> with a 10 GB volume, especially in the cloud. >>> >>> If still having trouble, let me know what the provenance tells you about >>> the reason for merging the data, and we can go from there. >>> >>> Thanks! >>> -Mark >>> >>> >>> On Dec 3, 2022, at 4:38 PM, Mark Payne <marka...@hotmail.com> wrote: >>> >>> Richard, >>> >>> I think just the flow structure shoudl be sufficient. >>> >>> Thanks >>> -Mark >>> >>> >>> On Dec 3, 2022, at 4:32 PM, Richard Beare <richard.be...@gmail.com> >>> wrote: >>> >>> Thanks for responding, >>> I re-tested with max bins = 2, but the behaviour remained the same. I >>> can easily share a version of the functioning workflow (and data), which is >>> part of a public project. The problem workflow (which shares many of the >>> same components) is part of a health research project, so more difficult. I >>> definitely can't share any data from that one. Do you need to see the data >>> or is the overall structure sufficient at this point? Happy to demonstrate >>> via video conference too. >>> >>> Thanks >>> >>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <marka...@hotmail.com> wrote: >>> >>>> Hi Richard, >>>> >>>> Can you try increasing the Maximum Number of Bins? I think there was an >>>> issue that was recently addressed in which the merge processors had an >>>> issue when Max Number of Bins = 1. >>>> >>>> If you still see the same issue, please provide a copy of the flow that >>>> can be used to replicate the issue. >>>> >>>> Thanks >>>> -Mark >>>> >>>> >>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <richard.be...@gmail.com> >>>> wrote: >>>> >>>> Hi, >>>> >>>> Pretty much the same - I seem to end up with flowfiles containing about >>>> 7 records, presumably always triggered by the timeout. >>>> >>>> I had thought the timeout needed to be less than the run schedule, but >>>> it looks like it can be the same. >>>> >>>> Here's a debug dump >>>> >>>> 10:13:43 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021] >>>> >>>> 10:13:43 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021] >>>> >>>> 10:13:44 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>> >>>> 10:13:44 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021] >>>> >>>> 10:13:44 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021] >>>> >>>> 10:13:45 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021] >>>> >>>> 10:13:46 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00] >>>> >>>> 10:13:46 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022] >>>> >>>> 10:13:46 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created >>>> OutputStream using session StandardProcessSession[id=83204] for >>>> RecordBin[size=0, full=false, isComplete=false, id=4022] >>>> >>>> 10:13:46 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022] >>>> >>>> 10:13:48 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022] >>>> >>>> 10:13:48 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022] >>>> >>>> 10:13:49 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>> >>>> 10:13:49 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022] >>>> >>>> 10:13:49 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022] >>>> >>>> 10:13:52 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022] >>>> >>>> 10:13:53 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>> >>>> 10:13:53 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023] >>>> >>>> 10:13:53 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created >>>> OutputStream using session StandardProcessSession[id=83205] for >>>> RecordBin[size=0, full=false, isComplete=false, id=4023] >>>> >>>> 10:13:53 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023] >>>> >>>> 10:13:54 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023] >>>> >>>> 10:13:54 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023] >>>> >>>> 10:13:55 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>> >>>> 10:13:55 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023] >>>> >>>> 10:13:55 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023] >>>> >>>> 10:13:58 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, >>>> full=false, isComplete=false, id=4023] is now expired. Completing bin. >>>> >>>> 10:13:58 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete because >>>> complete() was called >>>> >>>> 10:13:58 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record >>>> Writer using session StandardProcessSession[id=83205] for RecordBin[size=6, >>>> full=false, isComplete=true, id=4023] >>>> >>>> 10:13:58 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin >>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records with >>>> Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] >>>> using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, >>>> id=1068800, id=1068845] >>>> >>>> 10:13:58 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023] >>>> >>>> 10:14:01 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024] >>>> >>>> 10:14:01 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024] >>>> >>>> 10:14:02 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8] >>>> >>>> 10:14:02 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024] >>>> >>>> 10:14:02 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024] >>>> >>>> 10:14:05 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, >>>> full=false, isComplete=false, id=4024] is now expired. Completing bin. >>>> >>>> 10:14:05 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked >>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete because >>>> complete() was called >>>> >>>> 10:14:05 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record >>>> Writer using session StandardProcessSession[id=83206] for RecordBin[size=6, >>>> full=false, isComplete=true, id=4024] >>>> >>>> 10:14:05 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin >>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records with >>>> Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] >>>> using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, >>>> id=1069451, id=1069492] >>>> >>>> 10:14:05 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024] >>>> >>>> 10:14:07 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025] >>>> >>>> 10:14:07 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025] >>>> >>>> 10:14:08 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID >>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]} >>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e] >>>> >>>> 10:14:08 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating >>>> id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025] >>>> >>>> 10:14:08 UTC >>>> DEBUG >>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1 >>>> >>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred >>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025] >>>> >>>> >>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <joe.w...@gmail.com> wrote: >>>> >>>>> Hello >>>>> >>>>> Run schedule should be 0. >>>>> >>>>> 50 should be the min number of records >>>>> >>>>> 5 seconds is the max bin age it sounds like you want. >>>>> >>>>> Start with these changes and let us know what youre seeing. >>>>> >>>>> Thanks >>>>> >>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <richard.be...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> I'm having a great deal of trouble configuring the mergerecord >>>>>> processor to deliver reasonable performance and I'm not sure where to >>>>>> look >>>>>> to correct it. One of my upstream processors requires a single record per >>>>>> flowfile, but I'd like to create larger flowfiles before passing to the >>>>>> next stage. The flowfiles are independent at this stage so there's no >>>>>> special processing required of the merging. I'd like to create flowfiles >>>>>> of >>>>>> about 50 to 100 records. >>>>>> >>>>>> I have two tests, both running on the same nifi system. One uses >>>>>> synthetic data, the other the production data. The performance of the >>>>>> mergerecord processor for the synthetic data is as I'd expect, and I >>>>>> can't >>>>>> figure out why the production data is so much slower. Here's the >>>>>> configuration: >>>>>> >>>>>> mergerecord has the following settings. Timer driven, 1 concurrent >>>>>> task, 5 second run schedule, bin packing merge strategy, min records = 1, >>>>>> max records = 100, max bin age = 4.5 secs, maximum number of bins = 1. >>>>>> >>>>>> In the case of synthetic data the typical flowfile size is in the >>>>>> range 2 to 7KB. >>>>>> >>>>>> The size of flowfiles for the production case is smaller - typically >>>>>> around 1KB. >>>>>> >>>>>> The structure in the tests is slightly different. Synthetic is (note >>>>>> that I've removed the text part): >>>>>> >>>>>> [ { >>>>>> "sampleid" : 1075, >>>>>> "typeid" : 98, >>>>>> "dct" : "2020-01-25T21:40:25.515Z", >>>>>> "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt", >>>>>> "document" : "Text removed - typically a few hundred words", >>>>>> "docid" : "9" >>>>>> } ] >>>>>> >>>>>> Production is: >>>>>> [ { >>>>>> "doc_id" : "5.60622895E8", >>>>>> "doc_text" : " Text deleted - typically a few hundred words", >>>>>> "processing_timestamp" : "2022-11-27T23:56:35.601Z", >>>>>> "metadata_x_ocr_applied" : true, >>>>>> "metadata_x_parsed_by" : >>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser", >>>>>> "metadata_content_type" : "application/rtf", >>>>>> "metadata_page_count" : null, >>>>>> "metadata_creation_date" : null, >>>>>> "metadata_last_modified" : null >>>>>> } ] >>>>>> >>>>>> >>>>>> >>>>>> I load up the queue feeding the mergerecord processor with several >>>>>> hundred individual flowfiles and activate it. >>>>>> >>>>>> The synthetic data is nicely placed into chunks of 100, with any >>>>>> remainder being flushed in a smaller chunk. >>>>>> >>>>>> The production data is generally bundled into groups of 6 records, >>>>>> sometimes less. Certainly it never gets close to 100 records. >>>>>> >>>>>> Any ideas as to what I should look at to track down the difference? >>>>>> >>>>>> Thanks >>>>>> >>>>> >>>> >>> >>>