Hi Everyone,
Still struggling to fix this issue and may need to try some different
things.

What is the recommended way of transforming a record structure? At the
moment I have a groovy script doing this but the downstream processing is
very slow, as discussed in the preceding thread.

The transformation is very simple - starting structure is:

{
 "result" : {
"text" : " document text",
  "metadata" : {
     "X-TIKA:Parsed-By": [
     "org.apache.tika.parser.pdf.PDFParser"
     ],
    "X-OCR-Applied" : true,
    "dcterms:created": "2018;07-24T15:04:51Z",
    "Content-Type" : "application/pdf",
    "Page-Count" : 2,
  },
"success" : true,
"timestamp" :  "2022-12-20T09:02:27.902Z",
"processingElapsedTime" : 6
}
}


final structure is':

[ {
"doc_id" : 58,
"doc_text" : "   ",
"processing_timestamp" : "2022-12-20T09:02:27.902Z",
"metadata_x_ocr_applies" : true,
"metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
"metadata_content_type" : "application/pdf",
"metadata_page_count" : 1
"metadata_creation_date": null,
"metadata_last_modified: nill
}]

So a kind of flattening of the structure. Is there a processor I should be
using to do this instead of a groovy script?

Thanks

On Wed, Dec 14, 2022 at 7:57 AM Richard Beare <richard.be...@gmail.com>
wrote:

> Any thoughts on this? Are there some extra steps required when creating an
> avro file from a user defined schema?
>
> On Thu, Dec 8, 2022 at 2:56 PM Richard Beare <richard.be...@gmail.com>
> wrote:
>
>> Here's another result that I think suggests there's something wrong with
>> the avro files created by the groovy script, although I can't see what the
>> problem might be.
>>
>> The test is as follows. Output of the groovy script creating avro files
>> is passed to convertrecord, configured with an avro reader and json writer.
>> This is slow. The json output is then converted back to avro with another
>> convertrecord processor, configured with a jsontreereader and an avro
>> writer - this is fast, instantly emptying the queue. The result of that is
>> fed into the previously problematic merge processor which works exactly as
>> expected, producing flowfiles with 100 records each.
>>
>> The difference I can see between the two flow files is the way in which
>> the schema is specified. Perhaps some extras are required in the groovy
>> file to set that up?
>>
>> The slow one has:
>>
>> {"type":"record","name":"document", "fields":[{
>>
>> The fast one
>>
>> {"type":"record","name":"nifiRecord","namespace":"org.apache.nifi","fields":
>>
>>
>> Initial characters are also slightly different.
>> Slow one:
>>
>> 0000000   O   b   j 001 002 026   a   v   r   o   .   s   c   h   e   m
>> 0000020   a 346  \n   {   "   t   y   p   e   "   :   "   r   e   c   o
>>
>> Fast one
>>
>> 0000000   O   b   j 001 004 026   a   v   r   o   .   s   c   h   e   m
>> 0000020   a 362  \b   {   "   t   y   p   e   "   :   "   r   e   c   o
>>
>>
>> The groovy script is
>> CogStack-NiFi/parse-tika-result-json-to-avro.groovy at master ·
>> CogStack/CogStack-NiFi · GitHub
>> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-scripts/parse-tika-result-json-to-avro.groovy>
>>
>> The schema is
>> CogStack-NiFi/document.avsc at master · CogStack/CogStack-NiFi · GitHub
>> <https://github.com/CogStack/CogStack-NiFi/blob/master/nifi/user-schemas/document.avsc>
>>
>>
>> On Thu, Dec 8, 2022 at 1:59 PM Richard Beare <richard.be...@gmail.com>
>> wrote:
>>
>>> I'm diving into the convertrecord tests a bit deeper on the production
>>> server.
>>>
>>> The first test case - 259 documents, total of 1M when in avro format in
>>> the input queue to the convert record processor. These avro files were not
>>> created by the groovy script - they start life as a database query and the
>>> text field is in one of the columns. The convertrecord processor runs very
>>> quickly - click start, press refresh and it is done. The avro ends up like
>>> this:
>>>
>>> [ {
>>>   "sampleid" : 1075,
>>>   "typeid" : 98,
>>>   "dct" : "2020-01-25T21:40:25.515Z",
>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>   "document" : "Text removed",
>>>   "docid" : "9"
>>> } ]
>>>
>>> In the second test, where the text fields are extracted from pdf tika
>>> before avro files are created by the groovy script (from the tika json
>>> output), the total queue size for the 259 documents is larger - 1.77MB, and
>>> the performance is very different - press start, click refresh and only two
>>> flowfiles are processed.
>>>
>>> [ {
>>>   "doc_id" : "70",
>>>   "doc_text" : "text removed",
>>>   "processing_timestamp" : "2022-12-07T23:09:52.354Z",
>>>   "metadata_x_ocr_applied" : true,
>>>   "metadata_x_parsed_by" : "org.apache.tika.parser.pdf.PDFParser",
>>>   "metadata_content_type" : "application/pdf",
>>>   "metadata_page_count" : 1,
>>>   "metadata_creation_date" : null,
>>>   "metadata_last_modified" : null
>>> } ]
>>>
>>> I've noticed that the second one has a content.type attribute of
>>> 'application/json' which doesn't seem right and doesn't match the fast
>>> case. I'll see what happens if I change that.
>>>
>>> On Thu, Dec 8, 2022 at 9:39 AM Richard Beare <richard.be...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>> Some progress on debugging options. I've found a flow that exhibits the
>>>> problem using synthetic data. However the results are host dependent. On my
>>>> laptop a "run-once" click of merge record gives me two flowfiles of 100
>>>> records, while the same flow on the production server produces several much
>>>> smaller flowfiles. This makes me think that something funny is happening
>>>> with my storage setup that I'll need to chase.
>>>>
>>>> I had tested the convertrecord option (simply avroreader->avrowriter)
>>>> and it did seem slow, but I'll investigate this further as it may be
>>>> related to my storage issue.
>>>>
>>>>
>>>>
>>>> On Thu, Dec 8, 2022 at 1:23 AM Mark Payne <marka...@hotmail.com> wrote:
>>>>
>>>>> > Is there something about this structure that is likely to be causing
>>>>> the problem? Could there be other issues with the avro generated by the
>>>>> script?
>>>>>
>>>>> I don’t think the structure should matter. And as long as the avro
>>>>> produced is proper Avro, I don’t think it should matter. Unless perhaps
>>>>> there’s some issue with the Avro library itself that’s causing it to take 
>>>>> a
>>>>> really long time to parse the Avro or something? I’d be curious - if you
>>>>> take the output of your script and then you run it through a ConvertRecord
>>>>> (Avro Reader -> Json Writer) is the ConvertRecord fast? Or is it really
>>>>> slow to process it?
>>>>>
>>>>> On Dec 5, 2022, at 5:58 AM, Richard Beare <richard.be...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Further - I performed another test in which I replaced the custom json
>>>>> to avro script with a ConvertRecord processor - merge record appears to
>>>>> work as expected in that case.
>>>>>
>>>>> Output of convertrecord looks like this:
>>>>>
>>>>> [ {
>>>>>   "text" : "  No Alert Found \n\n",
>>>>>   "metadata" : {
>>>>>     "X_TIKA_Parsed_By" : null,
>>>>>     "X_OCR_Applied" : null,
>>>>>     "Content_Type" : null
>>>>>   },
>>>>>   "success" : true,
>>>>>   "timestamp" : "2022-12-05T10:49:18.568Z",
>>>>>   "processingElapsedTime" : 0,
>>>>>   "doc_id" : "5.60178607E8"
>>>>> } ]
>>>>>
>>>>> while the output of the script looks like:
>>>>>
>>>>> [ {
>>>>>   "doc_id" : "5.61996505E8",
>>>>>   "doc_text" : "  No Alert Found \n\n",
>>>>>   "processing_timestamp" : "2022-11-28T01:16:46.775Z",
>>>>>   "metadata_x_ocr_applied" : true,
>>>>>   "metadata_x_parsed_by" :
>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>>   "metadata_content_type" : "application/rtf",
>>>>>   "metadata_page_count" : null,
>>>>>   "metadata_creation_date" : null,
>>>>>   "metadata_last_modified" : null
>>>>> } ]
>>>>>
>>>>> Is there something about this structure that is likely to be causing
>>>>> the problem? Could there be other issues with the avro generated by the
>>>>> script?
>>>>>
>>>>> On Mon, Dec 5, 2022 at 9:31 PM Richard Beare <richard.be...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I've reset the backpressure to the default
>>>>>>
>>>>>> This remains something of a mystery. The merge with synthetic data
>>>>>> happily creates flowfiles with 100 records, and the join says "Records
>>>>>> merged due to: Bin is full" or "Records merged due to: Bin is full 
>>>>>> enough".
>>>>>> No timeouts in that case, even with the max bin age at 4.5 seconds. The
>>>>>> resulting flowfiles were about 300K.
>>>>>>
>>>>>> The real data is doing much the same as before, producing flowfiles
>>>>>> of about 30K, with 7 records or so. If I increase the maximum bin age to 
>>>>>> 30
>>>>>> seconds the size and record count is higher - 12 to 15. Nothing like the
>>>>>> behaviour with synthetic data, where the 100 record flowfiles are created
>>>>>> almost instantly. Joins are always due to bin age.
>>>>>>
>>>>>> Can the problem relate to the structure of the avro files? Any way to
>>>>>> dive into that? Everything else about the mergerecord settings appear the
>>>>>> same, so I can't see an explanation as to why the behaviour is different 
>>>>>> on
>>>>>> the same hardware.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 5, 2022 at 2:09 AM Mark Payne <marka...@hotmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Richard,
>>>>>>>
>>>>>>> So a few things that I’ve done/looked at.
>>>>>>>
>>>>>>> I generated some Avro data (random JSON that I downloaded from a
>>>>>>> Random JSON Generator and then converted to Avro).
>>>>>>>
>>>>>>> I then ran this avro data into both the MergeRecord processors.
>>>>>>>
>>>>>>> Firstly, I noticed that both are very slow. Found that was because
>>>>>>> Run Schedule was set to 5 seconds. This should *ALWAYS* be 0 secs for
>>>>>>> MergeRecord. And really for basically all processors except for the 
>>>>>>> first
>>>>>>> one in the flow.
>>>>>>>
>>>>>>> I also notice that you have backpressure set on your connections to
>>>>>>> 40,000 FlowFiles and 4 GB. This can significantly slow things down. If 
>>>>>>> you
>>>>>>> have performance concerns you definitely want backpressure set back to 
>>>>>>> the
>>>>>>> default of 10,000 FlowFiles. Otherwise, as the queues fill up they start
>>>>>>> “swapping out” FlowFiles to disk, and this can significantly slow things
>>>>>>> down.
>>>>>>>
>>>>>>> I noticed that MergeRecord is set to 1 concurrent task. Probably
>>>>>>> worth considering increasing that, if performance is a concern.
>>>>>>>
>>>>>>> That said, I am seeing nice, full bins of 100 records merged from
>>>>>>> each of the MergeRecord processors.
>>>>>>> So it is certainly possible that if you’re seeing smaller bins it’s
>>>>>>> becuase you’re timing out. The 4.5 seconds timeout is quite short. Have 
>>>>>>> you
>>>>>>> tried increasing that to say 30 seconds to see if it gives you larger 
>>>>>>> bins?
>>>>>>> I also recommend that you take a look at the data provenance to see
>>>>>>> why it’s creating the bins.
>>>>>>>
>>>>>>> If unclear how to do that:
>>>>>>> Right-click on the MergeRecord processor
>>>>>>> Go to View data provenance
>>>>>>> Scroll down the list until you see a “JOIN” event type. You can
>>>>>>> ignore the ATTRIBUTES_MODIFIED and DROP events for now.
>>>>>>> Click the ‘i’ icon on the left-hand side.
>>>>>>> This will show you details about the merge. In the Details tab, if
>>>>>>> you scroll down, it will show you a Details field, which tells you why 
>>>>>>> the
>>>>>>> data was merged. It should either say: "Records Merged due to: Bin has
>>>>>>> reached Max Bin Age” or “ Records Merged due to: Bin is full”
>>>>>>>
>>>>>>> If it is due to Max Bin Age reached, then I’d recommend increasing
>>>>>>> number of concurrent tasks, reducing backpressure to no more than 10,000
>>>>>>> FlowFiles in the queue, and/or increasing the Max Bin Age.
>>>>>>> Also worth asking - what kind of machines is this running on? A 64
>>>>>>> core VM with 1 TB volume will, of course, run MUCH differently than a 4
>>>>>>> core VM with a 10 GB volume, especially in the cloud.
>>>>>>>
>>>>>>> If still having trouble, let me know what the provenance tells you
>>>>>>> about the reason for merging the data, and we can go from there.
>>>>>>>
>>>>>>> Thanks!
>>>>>>> -Mark
>>>>>>>
>>>>>>>
>>>>>>> On Dec 3, 2022, at 4:38 PM, Mark Payne <marka...@hotmail.com> wrote:
>>>>>>>
>>>>>>> Richard,
>>>>>>>
>>>>>>> I think just the flow structure shoudl be sufficient.
>>>>>>>
>>>>>>> Thanks
>>>>>>> -Mark
>>>>>>>
>>>>>>>
>>>>>>> On Dec 3, 2022, at 4:32 PM, Richard Beare <richard.be...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Thanks for responding,
>>>>>>> I re-tested with max bins = 2, but the behaviour remained the same.
>>>>>>> I can easily share a version of the functioning workflow (and data), 
>>>>>>> which
>>>>>>> is part of a public project. The problem workflow (which shares many of 
>>>>>>> the
>>>>>>> same components) is part of a health research project, so more 
>>>>>>> difficult. I
>>>>>>> definitely can't share any data from that one. Do you need to see the 
>>>>>>> data
>>>>>>> or is the overall structure sufficient at this point? Happy to 
>>>>>>> demonstrate
>>>>>>> via video conference too.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> On Sun, Dec 4, 2022 at 1:37 AM Mark Payne <marka...@hotmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Richard,
>>>>>>>>
>>>>>>>> Can you try increasing the Maximum Number of Bins? I think there
>>>>>>>> was an issue that was recently addressed in which the merge processors 
>>>>>>>> had
>>>>>>>> an issue when Max Number of Bins = 1.
>>>>>>>>
>>>>>>>> If you still see the same issue, please provide a copy of the flow
>>>>>>>> that can be used to replicate the issue.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> -Mark
>>>>>>>>
>>>>>>>>
>>>>>>>> On Dec 3, 2022, at 5:21 AM, Richard Beare <richard.be...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Pretty much the same - I seem to end up with flowfiles containing
>>>>>>>> about 7 records, presumably always triggered by the timeout.
>>>>>>>>
>>>>>>>> I had thought the timeout needed to be less than the run schedule,
>>>>>>>> but it looks like it can be the same.
>>>>>>>>
>>>>>>>> Here's a debug dump
>>>>>>>>
>>>>>>>> 10:13:43 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1066297 to RecordBin[size=4, full=false, isComplete=false, id=4021]
>>>>>>>>
>>>>>>>> 10:13:43 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1066297 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>>>>>
>>>>>>>> 10:13:44 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>>>>>
>>>>>>>> 10:13:44 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1066372 to RecordBin[size=5, full=false, isComplete=false, id=4021]
>>>>>>>>
>>>>>>>> 10:13:44 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1066372 to RecordBin[size=6, full=false, isComplete=false, id=4021]
>>>>>>>>
>>>>>>>> 10:13:45 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1066575 to RecordBin[size=7, full=false, isComplete=true, id=4021]
>>>>>>>>
>>>>>>>> 10:13:46 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]
>>>>>>>>
>>>>>>>> 10:13:46 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1066612 to RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>>>>>
>>>>>>>> 10:13:46 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>>>>>>> OutputStream using session StandardProcessSession[id=83204] for
>>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4022]
>>>>>>>>
>>>>>>>> 10:13:46 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1066612 to RecordBin[size=1, full=false, isComplete=false, id=4022]
>>>>>>>>
>>>>>>>> 10:13:48 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1066896 to RecordBin[size=2, full=false, isComplete=false, id=4022]
>>>>>>>>
>>>>>>>> 10:13:48 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1066896 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>>>>>
>>>>>>>> 10:13:49 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>>
>>>>>>>> 10:13:49 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1067051 to RecordBin[size=3, full=false, isComplete=false, id=4022]
>>>>>>>>
>>>>>>>> 10:13:49 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1067051 to RecordBin[size=4, full=false, isComplete=false, id=4022]
>>>>>>>>
>>>>>>>> 10:13:52 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1067254 to RecordBin[size=7, full=false, isComplete=true, id=4022]
>>>>>>>>
>>>>>>>> 10:13:53 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>>>>>
>>>>>>>> 10:13:53 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1067395 to RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>>>>>
>>>>>>>> 10:13:53 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created
>>>>>>>> OutputStream using session StandardProcessSession[id=83205] for
>>>>>>>> RecordBin[size=0, full=false, isComplete=false, id=4023]
>>>>>>>>
>>>>>>>> 10:13:53 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1067395 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>>>>>
>>>>>>>> 10:13:54 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1068472 to RecordBin[size=1, full=false, isComplete=false, id=4023]
>>>>>>>>
>>>>>>>> 10:13:54 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1068472 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>>>>>
>>>>>>>> 10:13:55 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>>
>>>>>>>> 10:13:55 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1068597 to RecordBin[size=2, full=false, isComplete=false, id=4023]
>>>>>>>>
>>>>>>>> 10:13:55 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1068597 to RecordBin[size=3, full=false, isComplete=false, id=4023]
>>>>>>>>
>>>>>>>> 10:13:58 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4023] is now 
>>>>>>>> expired.
>>>>>>>> Completing bin.
>>>>>>>>
>>>>>>>> 10:13:58 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] as complete 
>>>>>>>> because
>>>>>>>> complete() was called
>>>>>>>>
>>>>>>>> 10:13:58 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>>>>>>> Writer using session StandardProcessSession[id=83205] for 
>>>>>>>> RecordBin[size=6,
>>>>>>>> full=false, isComplete=true, id=4023]
>>>>>>>>
>>>>>>>> 10:13:58 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records 
>>>>>>>> with
>>>>>>>> Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948]
>>>>>>>> using input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663,
>>>>>>>> id=1068800, id=1068845]
>>>>>>>>
>>>>>>>> 10:13:58 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1068845 to RecordBin[size=6, full=false, isComplete=true, id=4023]
>>>>>>>>
>>>>>>>> 10:14:01 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1069272 to RecordBin[size=2, full=false, isComplete=false, id=4024]
>>>>>>>>
>>>>>>>> 10:14:01 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1069272 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>>>>>
>>>>>>>> 10:14:02 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]
>>>>>>>>
>>>>>>>> 10:14:02 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1069316 to RecordBin[size=3, full=false, isComplete=false, id=4024]
>>>>>>>>
>>>>>>>> 10:14:02 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1069316 to RecordBin[size=4, full=false, isComplete=false, id=4024]
>>>>>>>>
>>>>>>>> 10:14:05 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1]
>>>>>>>> RecordBin[size=6, full=false, isComplete=false, id=4024] is now 
>>>>>>>> expired.
>>>>>>>> Completing bin.
>>>>>>>>
>>>>>>>> 10:14:05 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked
>>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] as complete 
>>>>>>>> because
>>>>>>>> complete() was called
>>>>>>>>
>>>>>>>> 10:14:05 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record
>>>>>>>> Writer using session StandardProcessSession[id=83206] for 
>>>>>>>> RecordBin[size=6,
>>>>>>>> full=false, isComplete=true, id=4024]
>>>>>>>>
>>>>>>>> 10:14:05 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin
>>>>>>>> RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records 
>>>>>>>> with
>>>>>>>> Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d]
>>>>>>>> using input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316,
>>>>>>>> id=1069451, id=1069492]
>>>>>>>>
>>>>>>>> 10:14:05 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1069492 to RecordBin[size=6, full=false, isComplete=true, id=4024]
>>>>>>>>
>>>>>>>> 10:14:07 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1072118 to RecordBin[size=2, full=false, isComplete=false, id=4025]
>>>>>>>>
>>>>>>>> 10:14:07 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1072118 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>>>>>
>>>>>>>> 10:14:08 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID
>>>>>>>> {"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
>>>>>>>> for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]
>>>>>>>>
>>>>>>>> 10:14:08 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating
>>>>>>>> id=1072197 to RecordBin[size=3, full=false, isComplete=false, id=4025]
>>>>>>>>
>>>>>>>> 10:14:08 UTC
>>>>>>>> DEBUG
>>>>>>>> 99ae16aa-0184-1000-8ccc-ee1f2ee77ca1
>>>>>>>>
>>>>>>>> MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred
>>>>>>>> id=1072197 to RecordBin[size=4, full=false, isComplete=false, id=4025]
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Dec 3, 2022 at 4:21 PM Joe Witt <joe.w...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hello
>>>>>>>>>
>>>>>>>>> Run schedule should be 0.
>>>>>>>>>
>>>>>>>>> 50 should be the min number of records
>>>>>>>>>
>>>>>>>>> 5 seconds is the max bin age it sounds like you want.
>>>>>>>>>
>>>>>>>>> Start with these changes and let us know what youre seeing.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> On Fri, Dec 2, 2022 at 10:12 PM Richard Beare <
>>>>>>>>> richard.be...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> I'm having a great deal of trouble configuring the mergerecord
>>>>>>>>>> processor to deliver reasonable performance and I'm not sure where 
>>>>>>>>>> to look
>>>>>>>>>> to correct it. One of my upstream processors requires a single 
>>>>>>>>>> record per
>>>>>>>>>> flowfile, but I'd like to create larger flowfiles before passing to 
>>>>>>>>>> the
>>>>>>>>>> next stage. The flowfiles are independent at this stage so there's no
>>>>>>>>>> special processing required of the merging. I'd like to create 
>>>>>>>>>> flowfiles of
>>>>>>>>>> about 50 to 100 records.
>>>>>>>>>>
>>>>>>>>>> I have two tests, both running on the same nifi system. One uses
>>>>>>>>>> synthetic data, the other the production data. The performance of the
>>>>>>>>>> mergerecord processor for the synthetic data is as I'd expect, and I 
>>>>>>>>>> can't
>>>>>>>>>> figure out why the  production data is so much slower. Here's the
>>>>>>>>>> configuration:
>>>>>>>>>>
>>>>>>>>>> mergerecord has the following settings. Timer driven, 1
>>>>>>>>>> concurrent task, 5 second run schedule, bin packing merge strategy, 
>>>>>>>>>> min
>>>>>>>>>> records = 1, max records = 100, max bin age = 4.5 secs, maximum 
>>>>>>>>>> number of
>>>>>>>>>> bins = 1.
>>>>>>>>>>
>>>>>>>>>> In the case of synthetic data the typical flowfile size is in the
>>>>>>>>>> range 2 to 7KB.
>>>>>>>>>>
>>>>>>>>>> The size of flowfiles for the production case is smaller -
>>>>>>>>>> typically around 1KB.
>>>>>>>>>>
>>>>>>>>>> The structure in the tests is slightly different. Synthetic is
>>>>>>>>>> (note that I've removed the text part):
>>>>>>>>>>
>>>>>>>>>> [ {
>>>>>>>>>>   "sampleid" : 1075,
>>>>>>>>>>   "typeid" : 98,
>>>>>>>>>>   "dct" : "2020-01-25T21:40:25.515Z",
>>>>>>>>>>   "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
>>>>>>>>>>   "document" : "Text removed - typically a few hundred words",
>>>>>>>>>>   "docid" : "9"
>>>>>>>>>> } ]
>>>>>>>>>>
>>>>>>>>>> Production is:
>>>>>>>>>> [ {
>>>>>>>>>>   "doc_id" : "5.60622895E8",
>>>>>>>>>>   "doc_text" : " Text deleted - typically a few hundred words",
>>>>>>>>>>   "processing_timestamp" : "2022-11-27T23:56:35.601Z",
>>>>>>>>>>   "metadata_x_ocr_applied" : true,
>>>>>>>>>>   "metadata_x_parsed_by" :
>>>>>>>>>> "org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
>>>>>>>>>>   "metadata_content_type" : "application/rtf",
>>>>>>>>>>   "metadata_page_count" : null,
>>>>>>>>>>   "metadata_creation_date" : null,
>>>>>>>>>>   "metadata_last_modified" : null
>>>>>>>>>> } ]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I load up the queue feeding the mergerecord processor with
>>>>>>>>>> several hundred individual flowfiles and activate it.
>>>>>>>>>>
>>>>>>>>>> The synthetic data is nicely placed into chunks of 100, with any
>>>>>>>>>> remainder being flushed in a smaller chunk.
>>>>>>>>>>
>>>>>>>>>> The production data is generally bundled into groups of 6
>>>>>>>>>> records, sometimes less. Certainly it never gets close to 100 
>>>>>>>>>> records.
>>>>>>>>>>
>>>>>>>>>> Any ideas as to what I should look at to track down the
>>>>>>>>>> difference?
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>

Reply via email to