> Is there something about this structure that is likely to be causing the 
> problem? Could there be other issues with the avro generated by the script?

I don’t think the structure should matter. And as long as the avro produced is 
proper Avro, I don’t think it should matter. Unless perhaps there’s some issue 
with the Avro library itself that’s causing it to take a really long time to 
parse the Avro or something? I’d be curious - if you take the output of your 
script and then you run it through a ConvertRecord (Avro Reader -> Json Writer) 
is the ConvertRecord fast? Or is it really slow to process it?

On Dec 5, 2022, at 5:58 AM, Richard Beare <richard.be...@gmail.com> wrote:

Further - I performed another test in which I replaced the custom json to avro 
script with a ConvertRecord processor - merge record appears to work as 
expected in that case.

Output of convertrecord looks like this:

[ {
  "text" : "  No Alert Found \n\n",
  "metadata" : {
    "X_TIKA_Parsed_By" : null,
    "X_OCR_Applied" : null,
    "Content_Type" : null
  },
  "success" : true,
  "timestamp" : "2022-12-05T10:49:18.568Z",
  "processingElapsedTime" : 0,
  "doc_id" : "5.60178607E8"
} ]

while the output of the script looks like:

[ {
  "doc_id" : "5.61996505E8",
  "doc_text" : "  No Alert Found \n\n",
  "processing_timestamp" : "2022-11-28T01:16:46.775Z",
  "metadata_x_ocr_applied" : true,
  "metadata_x_parsed_by" : 
"org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
  "metadata_content_type" : "application/rtf",
  "metadata_page_count" : null,
  "metadata_creation_date" : null,
  "metadata_last_modified" : null
} ]

Is there something about this structure that is likely to be causing the 
problem? Could there be other issues with the avro generated by the script?

On Mon, Dec 5, 2022 at 9:31 PM Richard Beare 
<richard.be...@gmail.com<mailto:richard.be...@gmail.com>> wrote:
I've reset the backpressure to the default

This remains something of a mystery. The merge with synthetic data happily 
creates flowfiles with 100 records, and the join says "Records merged due to: 
Bin is full" or "Records merged due to: Bin is full enough". No timeouts in 
that case, even with the max bin age at 4.5 seconds. The resulting flowfiles 
were about 300K.

The real data is doing much the same as before, producing flowfiles of about 
30K, with 7 records or so. If I increase the maximum bin age to 30 seconds the 
size and record count is higher - 12 to 15. Nothing like the behaviour with 
synthetic data, where the 100 record flowfiles are created almost instantly. 
Joins are always due to bin age.

Can the problem relate to the structure of the avro files? Any way to dive into 
that? Everything else about the mergerecord settings appear the same, so I 
can't see an explanation as to why the behaviour is different on the same 
hardware.








On Mon, Dec 5, 2022 at 2:09 AM Mark Payne 
<marka...@hotmail.com<mailto:marka...@hotmail.com>> wrote:
Hey Richard,

So a few things that I’ve done/looked at.

I generated some Avro data (random JSON that I downloaded from a Random JSON 
Generator and then converted to Avro).

I then ran this avro data into both the MergeRecord processors.

Firstly, I noticed that both are very slow. Found that was because Run Schedule 
was set to 5 seconds. This should *ALWAYS* be 0 secs for MergeRecord. And 
really for basically all processors except for the first one in the flow.

I also notice that you have backpressure set on your connections to 40,000 
FlowFiles and 4 GB. This can significantly slow things down. If you have 
performance concerns you definitely want backpressure set back to the default 
of 10,000 FlowFiles. Otherwise, as the queues fill up they start “swapping out” 
FlowFiles to disk, and this can significantly slow things down.

I noticed that MergeRecord is set to 1 concurrent task. Probably worth 
considering increasing that, if performance is a concern.

That said, I am seeing nice, full bins of 100 records merged from each of the 
MergeRecord processors.
So it is certainly possible that if you’re seeing smaller bins it’s becuase 
you’re timing out. The 4.5 seconds timeout is quite short. Have you tried 
increasing that to say 30 seconds to see if it gives you larger bins?
I also recommend that you take a look at the data provenance to see why it’s 
creating the bins.

If unclear how to do that:
Right-click on the MergeRecord processor
Go to View data provenance
Scroll down the list until you see a “JOIN” event type. You can ignore the 
ATTRIBUTES_MODIFIED and DROP events for now.
Click the ‘i’ icon on the left-hand side.
This will show you details about the merge. In the Details tab, if you scroll 
down, it will show you a Details field, which tells you why the data was 
merged. It should either say: "Records Merged due to: Bin has reached Max Bin 
Age” or “ Records Merged due to: Bin is full”

If it is due to Max Bin Age reached, then I’d recommend increasing number of 
concurrent tasks, reducing backpressure to no more than 10,000 FlowFiles in the 
queue, and/or increasing the Max Bin Age.
Also worth asking - what kind of machines is this running on? A 64 core VM with 
1 TB volume will, of course, run MUCH differently than a 4 core VM with a 10 GB 
volume, especially in the cloud.

If still having trouble, let me know what the provenance tells you about the 
reason for merging the data, and we can go from there.

Thanks!
-Mark


On Dec 3, 2022, at 4:38 PM, Mark Payne 
<marka...@hotmail.com<mailto:marka...@hotmail.com>> wrote:

Richard,

I think just the flow structure shoudl be sufficient.

Thanks
-Mark


On Dec 3, 2022, at 4:32 PM, Richard Beare 
<richard.be...@gmail.com<mailto:richard.be...@gmail.com>> wrote:

Thanks for responding,
I re-tested with max bins = 2, but the behaviour remained the same. I can 
easily share a version of the functioning workflow (and data), which is part of 
a public project. The problem workflow (which shares many of the same 
components) is part of a health research project, so more difficult. I 
definitely can't share any data from that one. Do you need to see the data or 
is the overall structure sufficient at this point? Happy to demonstrate via 
video conference too.

Thanks

On Sun, Dec 4, 2022 at 1:37 AM Mark Payne 
<marka...@hotmail.com<mailto:marka...@hotmail.com>> wrote:
Hi Richard,

Can you try increasing the Maximum Number of Bins? I think there was an issue 
that was recently addressed in which the merge processors had an issue when Max 
Number of Bins = 1.

If you still see the same issue, please provide a copy of the flow that can be 
used to replicate the issue.

Thanks
-Mark


On Dec 3, 2022, at 5:21 AM, Richard Beare 
<richard.be...@gmail.com<mailto:richard.be...@gmail.com>> wrote:

Hi,

Pretty much the same - I seem to end up with flowfiles containing about 7 
records, presumably always triggered by the timeout.

I had thought the timeout needed to be less than the run schedule, but it looks 
like it can be the same.

Here's a debug dump

10:13:43 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066297 to 
RecordBin[size=4, full=false, isComplete=false, id=4021]

10:13:43 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066297 to 
RecordBin[size=5, full=false, isComplete=false, id=4021]

10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]

10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066372 to 
RecordBin[size=5, full=false, isComplete=false, id=4021]

10:13:44 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066372 to 
RecordBin[size=6, full=false, isComplete=false, id=4021]

10:13:45 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066575 to 
RecordBin[size=7, full=false, isComplete=true, id=4021]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=9e9908f6-b28e-4615-b6c8-4bd163a3bc00]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066612 to 
RecordBin[size=0, full=false, isComplete=false, id=4022]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream using 
session StandardProcessSession[id=83204] for RecordBin[size=0, full=false, 
isComplete=false, id=4022]

10:13:46 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066612 to 
RecordBin[size=1, full=false, isComplete=false, id=4022]

10:13:48 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1066896 to 
RecordBin[size=2, full=false, isComplete=false, id=4022]

10:13:48 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1066896 to 
RecordBin[size=3, full=false, isComplete=false, id=4022]

10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]

10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067051 to 
RecordBin[size=3, full=false, isComplete=false, id=4022]

10:13:49 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067051 to 
RecordBin[size=4, full=false, isComplete=false, id=4022]

10:13:52 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067254 to 
RecordBin[size=7, full=false, isComplete=true, id=4022]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1067395 to 
RecordBin[size=0, full=false, isComplete=false, id=4023]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Created OutputStream using 
session StandardProcessSession[id=83205] for RecordBin[size=0, full=false, 
isComplete=false, id=4023]

10:13:53 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1067395 to 
RecordBin[size=1, full=false, isComplete=false, id=4023]

10:13:54 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068472 to 
RecordBin[size=1, full=false, isComplete=false, id=4023]

10:13:54 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068472 to 
RecordBin[size=2, full=false, isComplete=false, id=4023]

10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]

10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1068597 to 
RecordBin[size=2, full=false, isComplete=false, id=4023]

10:13:55 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068597 to 
RecordBin[size=3, full=false, isComplete=false, id=4023]

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, 
full=false, isComplete=false, id=4023] is now expired. Completing bin.

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked RecordBin[size=6, 
full=false, isComplete=true, id=4023] as complete because complete() was called

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer using 
session StandardProcessSession[id=83205] for RecordBin[size=6, full=false, 
isComplete=true, id=4023]

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin 
RecordBin[size=6, full=false, isComplete=true, id=4023] with 6 records with 
Merged FlowFile FlowFile[filename=6824b503-82b9-444e-a77e-9b081e878948] using 
input FlowFiles [id=1067395, id=1068472, id=1068597, id=1068663, id=1068800, 
id=1068845]

10:13:58 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1068845 to 
RecordBin[size=6, full=false, isComplete=true, id=4023]

10:14:01 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069272 to 
RecordBin[size=2, full=false, isComplete=false, id=4024]

10:14:01 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069272 to 
RecordBin[size=3, full=false, isComplete=false, id=4024]

10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=b7f4498d-647a-46d1-ad9f-badaed8591f8]

10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1069316 to 
RecordBin[size=3, full=false, isComplete=false, id=4024]

10:14:02 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069316 to 
RecordBin[size=4, full=false, isComplete=false, id=4024]

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] RecordBin[size=6, 
full=false, isComplete=false, id=4024] is now expired. Completing bin.

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Marked RecordBin[size=6, 
full=false, isComplete=true, id=4024] as complete because complete() was called

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Closed Record Writer using 
session StandardProcessSession[id=83206] for RecordBin[size=6, full=false, 
isComplete=true, id=4024]

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Completed bin 
RecordBin[size=6, full=false, isComplete=true, id=4024] with 6 records with 
Merged FlowFile FlowFile[filename=6c13e518-655b-4507-ad6c-d37f6b9c0a5d] using 
input FlowFiles [id=1069044, id=1069103, id=1069272, id=1069316, id=1069451, 
id=1069492]

10:14:05 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1069492 to 
RecordBin[size=6, full=false, isComplete=true, id=4024]

10:14:07 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072118 to 
RecordBin[size=2, full=false, isComplete=false, id=4025]

10:14:07 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1072118 to 
RecordBin[size=3, full=false, isComplete=false, id=4025]

10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Got Group ID 
{"type":"record","name":"document","fields":[{"name":"doc_id","type":"string"},{"name":"doc_text","type":"string","default":""},{"name":"processing_timestamp","type":{"type":"string","logicalType":"timestamp-millis"}},{"name":"metadata_x_ocr_applied","type":"boolean"},{"name":"metadata_x_parsed_by","type":"string"},{"name":"metadata_content_type","type":["null","string"],"default":null},{"name":"metadata_page_count","type":["null","int"],"default":null},{"name":"metadata_creation_date","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null},{"name":"metadata_last_modified","type":["null",{"type":"string","logicalType":"timestamp-millis"}],"default":null}]}
 for FlowFile[filename=7d4f7a2b-ea59-4b9c-a7d6-df035fa3856e]

10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Migrating id=1072197 to 
RecordBin[size=3, full=false, isComplete=false, id=4025]

10:14:08 UTC
DEBUG
99ae16aa-0184-1000-8ccc-ee1f2ee77ca1

MergeRecord[id=99ae16aa-0184-1000-8ccc-ee1f2ee77ca1] Transferred id=1072197 to 
RecordBin[size=4, full=false, isComplete=false, id=4025]


On Sat, Dec 3, 2022 at 4:21 PM Joe Witt 
<joe.w...@gmail.com<mailto:joe.w...@gmail.com>> wrote:
Hello

Run schedule should be 0.

50 should be the min number of records

5 seconds is the max bin age it sounds like you want.

Start with these changes and let us know what youre seeing.

Thanks

On Fri, Dec 2, 2022 at 10:12 PM Richard Beare 
<richard.be...@gmail.com<mailto:richard.be...@gmail.com>> wrote:
Hi,
I'm having a great deal of trouble configuring the mergerecord processor to 
deliver reasonable performance and I'm not sure where to look to correct it. 
One of my upstream processors requires a single record per flowfile, but I'd 
like to create larger flowfiles before passing to the next stage. The flowfiles 
are independent at this stage so there's no special processing required of the 
merging. I'd like to create flowfiles of about 50 to 100 records.

I have two tests, both running on the same nifi system. One uses synthetic 
data, the other the production data. The performance of the mergerecord 
processor for the synthetic data is as I'd expect, and I can't figure out why 
the  production data is so much slower. Here's the
configuration:

mergerecord has the following settings. Timer driven, 1 concurrent task, 5 
second run schedule, bin packing merge strategy, min records = 1, max records = 
100, max bin age = 4.5 secs, maximum number of bins = 1.

In the case of synthetic data the typical flowfile size is in the range 2 to 
7KB.

The size of flowfiles for the production case is smaller - typically around 1KB.

The structure in the tests is slightly different. Synthetic is (note that I've 
removed the text part):

[ {
  "sampleid" : 1075,
  "typeid" : 98,
  "dct" : "2020-01-25T21:40:25.515Z",
  "filename" : "__tmp/txt/mtsamples-type-98-sample-1075.txt",
  "document" : "Text removed - typically a few hundred words",
  "docid" : "9"
} ]

Production is:
[ {
  "doc_id" : "5.60622895E8",
  "doc_text" : " Text deleted - typically a few hundred words",
  "processing_timestamp" : "2022-11-27T23:56:35.601Z",
  "metadata_x_ocr_applied" : true,
  "metadata_x_parsed_by" : 
"org.apache.tika.parser.DefaultParser;org.apache.tika.parser.microsoft.rtf.RTFParser;org.apache.tika.parser.AutoDetectParser",
  "metadata_content_type" : "application/rtf",
  "metadata_page_count" : null,
  "metadata_creation_date" : null,
  "metadata_last_modified" : null
} ]



I load up the queue feeding the mergerecord processor with several hundred 
individual flowfiles and activate it.

The synthetic data is nicely placed into chunks of 100, with any remainder 
being flushed in a smaller chunk.

The production data is generally bundled into groups of 6 records, sometimes 
less. Certainly it never gets close to 100 records.

Any ideas as to what I should look at to track down the difference?

Thanks




Reply via email to