The script generating the avro files is:
https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-tika-result-json-to-avro.groovy

On Mon, Dec 5, 2022 at 9:58 PM Richard Beare <richard.be...@gmail.com>
wrote:

> Further - I performed another test in which I replaced the custom json to
> avro script with a ConvertRecord processor - merge record appears to work
> as expected in that case.
>
> Output of convertrecord looks like this:
>
> [ {
>   "text" : "  No Alert Found \n\n",
>   "metadata" : {
>     "X_TIKA_Parsed_By" : null,
>     "X_OCR_Applied" : null,
>     "Content_Type" : null
>   },
>   "success" : true,
>   "timestamp" : "2022-12-05T10:49:18.568Z",
>   "processingElapsedTime" : 0,
>   "doc_id" : "5.60178607E8"
> } ]
>
> while the output of the script looks like:
>
> [ {
>   "doc_id" : "5.61996505E8",
>   "doc_text" : "  No Alert Found \n\n",
>   "processing_timestamp" : "2022-11-28T01:16:46.775Z",
>   "metadata_x_ocr_applied" : true,
>   "metadata_x_parsed_by" :
> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>   "metadata_content_type" : "application/rtf",
>   "metadata_page_count" : null,
>   "metadata_creation_date" : null,
>   "metadata_last_modified" : null
> } ]
>
> Is there something about this structure that is likely to be causing the
> problem? Could there be other issues with the avro generated by the script?
>
> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <richard.be...@gmail.com>
> wrote:
>
>> I've reset the backpressure to the default
>>
>> This remains something of a mystery. The merge with synthetic data
>> happily creates flowfiles with 100 records, and the join says "Records
>> merged due to: Bin is full" or "Records merged due to: Bin is full enough".
>> No timeouts in that case, even with the max bin age at 4.5 seconds. The
>> resulting flowfiles were about 300K.
>>
>> The real data is doing much the same as before, producing flowfiles of
>> about 30K, with 7 records or so. If I increase the maximum bin age to 30
>> seconds the size and record count is higher - 12 to 15. Nothing like the
>> behaviour with synthetic data, where the 100 record flowfiles are created
>> almost instantly. Joins are always due to bin age.
>>
>> Can the problem relate to the structure of the avro files? Any way to
>> dive into that? Everything else about the mergerecord settings appear the
>> same, so I can't see an explanation as to why the behaviour is different on
>> the same hardware.
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <marka...@hotmail.com> wrote:
>>
>>> Hey Richard,
>>>
>>> So a few things that I’ve done/looked at.
>>>
>>> I generated some Avro data (random JSON that I downloaded from a Random
>>> JSON Generator and then converted to Avro).
>>>
>>> I then ran this avro data into both the MergeRecord processors.
>>>
>>> Firstly, I noticed that both are very slow. Found that was because Run
>>> Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for
>>> MergeRecord. And really for basically all processors except for the first
>>> one in the flow.
>>>
>>> I also notice that you have backpressure set on your connections to
>>> 40,000 FlowFiles and 4 GB. This can significantly slow things down. If you
>>> have performance concerns you definitely want backpressure set back to the
>>> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start
>>> “swapping out” FlowFiles to disk, and this can significantly slow things
>>> down.
>>>
>>> I noticed that MergeRecord is set to 1 concurrent task. Probably worth
>>> considering increasing that, if performance is a concern.
>>>
>>> That said, I am seeing nice, full bins of 100 records merged from each
>>> of the MergeRecord processors.
>>> So it is certainly possible that if you’re seeing smaller bins it’s
>>> becuase you’re timing out. The 4.5 seconds timeout is quite short. Have you
>>> tried increasing that to say 30 seconds to see if it gives you larger bins?
>>> I also recommend that you take a look at the data provenance to see why
>>> it’s creating the bins.
>>>
>>> If unclear how to do that:
>>> Right-click on the MergeRecord processor
>>> Go to View data provenance
>>> Scroll down the list until you see a “JOIN” event type. You can ignore
>>> the ATTRIBUTES_MODIFIED and DROP events for now.
>>> Click the ‘i’ icon on the left-hand side.
>>> This will show you details about the merge. In the Details tab, if you
>>> scroll down, it will show you a Details field, which tells you why the data
>>> was merged. It should either say: "Records Merged due to: Bin has reached
>>> Max Bin Age” or “ Records Merged due to: Bin is full”
>>>
>>> If it is due to Max Bin Age reached, then I’d recommend increasing
>>> number of concurrent tasks, reducing backpressure to no more than 10,000
>>> FlowFiles in the queue, and/or increasing the Max Bin Age.
>>> Also worth asking - what kind of machines is this running on? A 64 core
>>> VM with 1 TB volume will, of course, run MUCH differently than a 4 core VM
>>> with a 10 GB volume, especially in the cloud.
>>>
>>> If still having trouble, let me know what the provenance tells you about
>>> the reason for merging the data, and we can go from there.
>>>
>>> Thanks!
>>> -Mark
>>>
>>>
>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <marka...@hotmail.com> wrote:
>>>
>>> Richard,
>>>
>>> I think just the flow structure shoudl be sufficient.
>>>
>>> Thanks
>>> -Mark
>>>
>>>
>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <richard.be...@gmail.com>
>>> wrote:
>>>
>>> Thanks for responding,
>>> I re-tested with max bins = 2, but the behaviour remained the same. I
>>> can easily share a version of the functioning workflow (and data), which is
>>> part of a public project. The problem workflow (which shares many of the
>>> same components) is part of a health research project, so more difficult. I
>>> definitely can't share any data from that one. Do you need to see the data
>>> or is the overall structure sufficient at this point? Happy to demonstrate
>>> via video conference too.
>>>
>>> Thanks
>>>
>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <marka...@hotmail.com> wrote:
>>>
>>>> Hi Richard,
>>>>
>>>> Can you try increasing the Maximum Number of Bins? I think there was an
>>>> issue that was recently addressed in which the merge processors had an
>>>> issue when Max Number of Bins = 1.
>>>>
>>>> If you still see the same issue, please provide a copy of the flow that
>>>> can be used to replicate the issue.
>>>>
>>>> Thanks
>>>> -Mark
>>>>
>>>>
>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <richard.be...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Pretty much the same - I seem to end up with flowfiles containing about
>>>> 7 records, presumably always triggered by the timeout.
>>>>
>>>> I had thought the timeout needed to be less than the run schedule, but
>>>> it looks like it can be the same.
>>>>
>>>> Here's a debug dump
>>>>
>>>> 10:13:43 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021]
>>>>
>>>> 10:13:43 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>
>>>> 10:13:44 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>
>>>> 10:13:44 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>
>>>> 10:13:44 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021]
>>>>
>>>> 10:13:45 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021]
>>>>
>>>> 10:13:46 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>
>>>> 10:13:46 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>
>>>> 10:13:46 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>>> OutputStream using session StandardProcessSession[id=83204] for
>>>> RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>
>>>> 10:13:46 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022]
>>>>
>>>> 10:13:48 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022]
>>>>
>>>> 10:13:48 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>
>>>> 10:13:49 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>
>>>> 10:13:49 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>
>>>> 10:13:49 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022]
>>>>
>>>> 10:13:52 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022]
>>>>
>>>> 10:13:53 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>
>>>> 10:13:53 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>
>>>> 10:13:53 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>>> OutputStream using session StandardProcessSession[id=83205] for
>>>> RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>
>>>> 10:13:53 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>
>>>> 10:13:54 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>
>>>> 10:13:54 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>
>>>> 10:13:55 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>
>>>> 10:13:55 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>
>>>> 10:13:55 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023]
>>>>
>>>> 10:13:58 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6,
>>>> full=false, isComplete=false, id=4023] is now expired. Completing bin.
>>>>
>>>> 10:13:58 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete because
>>>> complete() was called
>>>>
>>>> 10:13:58 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>>> Writer using session StandardProcessSession[id=83205] for RecordBin[size=6,
>>>> full=false, isComplete=true, id=4023]
>>>>
>>>> 10:13:58 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records with
>>>> Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948]
>>>> using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663,
>>>> id=1068800, id=1068845]
>>>>
>>>> 10:13:58 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023]
>>>>
>>>> 10:14:01 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024]
>>>>
>>>> 10:14:01 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>
>>>> 10:14:02 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>
>>>> 10:14:02 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>
>>>> 10:14:02 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024]
>>>>
>>>> 10:14:05 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6,
>>>> full=false, isComplete=false, id=4024] is now expired. Completing bin.
>>>>
>>>> 10:14:05 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete because
>>>> complete() was called
>>>>
>>>> 10:14:05 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>>> Writer using session StandardProcessSession[id=83206] for RecordBin[size=6,
>>>> full=false, isComplete=true, id=4024]
>>>>
>>>> 10:14:05 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records with
>>>> Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d]
>>>> using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316,
>>>> id=1069451, id=1069492]
>>>>
>>>> 10:14:05 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024]
>>>>
>>>> 10:14:07 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025]
>>>>
>>>> 10:14:07 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>
>>>> 10:14:08 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>
>>>> 10:14:08 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>> id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>
>>>> 10:14:08 UTC
>>>> DEBUG
>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>
>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025]
>>>>
>>>>
>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <joe.w...@gmail.com> wrote:
>>>>
>>>>> Hello
>>>>>
>>>>> Run schedule should be 0.
>>>>>
>>>>> 50 should be the min number of records
>>>>>
>>>>> 5 seconds is the max bin age it sounds like you want.
>>>>>
>>>>> Start with these changes and let us know what youre seeing.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <richard.be...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>> I'm having a great deal of trouble configuring the mergerecord
>>>>>> processor to deliver reasonable performance and I'm not sure where to 
>>>>>> look
>>>>>> to correct it. One of my upstream processors requires a single record per
>>>>>> flowfile, but I'd like to create larger flowfiles before passing to the
>>>>>> next stage. The flowfiles are independent at this stage so there's no
>>>>>> special processing required of the merging. I'd like to create flowfiles 
>>>>>> of
>>>>>> about 50 to 100 records.
>>>>>>
>>>>>> I have two tests, both running on the same nifi system. One uses
>>>>>> synthetic data, the other the production data. The performance of the
>>>>>> mergerecord processor for the synthetic data is as I'd expect, and I 
>>>>>> can't
>>>>>> figure out why the  production data is so much slower. Here's the
>>>>>> configuration:
>>>>>>
>>>>>> mergerecord has the following settings. Timer driven, 1 concurrent
>>>>>> task, 5 second run schedule, bin packing merge strategy, min records = 1,
>>>>>> max records = 100, max bin age = 4.5 secs, maximum number of bins = 1.
>>>>>>
>>>>>> In the case of synthetic data the typical flowfile size is in the
>>>>>> range 2 to 7KB.
>>>>>>
>>>>>> The size of flowfiles for the production case is smaller - typically
>>>>>> around 1KB.
>>>>>>
>>>>>> The structure in the tests is slightly different. Synthetic is (note
>>>>>> that I've removed the text part):
>>>>>>
>>>>>> [ {
>>>>>>   "sampleid" : 1075,
>>>>>>   "typeid" : 98,
>>>>>>   "dct" : "2020-01-25T21:40:25.515Z",
>>>>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>>>>   "document" : "Text removed - typically a few hundred words",
>>>>>>   "docid" : "9"
>>>>>> } ]
>>>>>>
>>>>>> Production is:
>>>>>> [ {
>>>>>>   "doc_id" : "5.60622895E8",
>>>>>>   "doc_text" : " Text deleted - typically a few hundred words",
>>>>>>   "processing_timestamp" : "2022-11-27T23:56:35.601Z",
>>>>>>   "metadata_x_ocr_applied" : true,
>>>>>>   "metadata_x_parsed_by" :
>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>>>   "metadata_content_type" : "application/rtf",
>>>>>>   "metadata_page_count" : null,
>>>>>>   "metadata_creation_date" : null,
>>>>>>   "metadata_last_modified" : null
>>>>>> } ]
>>>>>>
>>>>>>
>>>>>>
>>>>>> I load up the queue feeding the mergerecord processor with several
>>>>>> hundred individual flowfiles and activate it.
>>>>>>
>>>>>> The synthetic data is nicely placed into chunks of 100, with any
>>>>>> remainder being flushed in a smaller chunk.
>>>>>>
>>>>>> The production data is generally bundled into groups of 6 records,
>>>>>> sometimes less. Certainly it never gets close to 100 records.
>>>>>>
>>>>>> Any ideas as to what I should look at to track down the difference?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>
>>>>
>>>
>>>

Reply via email to