It should be okay to create a buffer like that. Assuming the FlowFile is small. 
Typically we try to avoid buffering the content of a FlowFile into memory. But 
if it’s a reasonable small FlowFile, that’s probably fine.

To be honest, if the issue is intermittent and doesn’t always happen on the 
same input, it sounds like a threading/concurrency bug. Do you have a buffer or 
anything like that as a member variable?

On Jun 22, 2020, at 10:02 PM, Jason Iannone 
<bread...@gmail.com<mailto:bread...@gmail.com>> wrote:

I'm now thinking its due to how we handled reading the flowfile content into a 
buffer.

Previous:
session.read(flowFile, in -> {
  atomicVessel.set(ByteStreams.toByteArray(in));
});

Current:
final byte[] buffer = new byte[(int) flowFile.getSize()];
session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));

Making this change reduced the occurrences of the data corruption, but we still 
saw it occur. What I'm now wondering is if sizing the byte array based on 
flowFile.getSize() is ideal? The contents of the file are raw bytes coming from 
ConsumeKafka_2_0.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:51 PM Mark Payne 
<marka...@hotmail.com<mailto:marka...@hotmail.com>> wrote:
Jason,

Glad to hear it. This is where the data provenance becomes absolutely 
invaluable. So now you should be able to trace the lineage of that FlowFile 
back to the start using data provenance. You can see exactly what it looked 
like when it was received. If it looks wrong there, the provenance shows 
exactly where it was received from so you know where to look next. If it looks 
good on receipt, you can trace the data through the flow and see exactly what 
the data looked like before and after each processor. And when you see which 
processor resulted in corruption, you can easily download the data as it looks 
when it went into the processor to make it easy to re-ingest and test.

Thanks
-Mark


On Jun 22, 2020, at 4:46 PM, Jason Iannone 
<bread...@gmail.com<mailto:bread...@gmail.com>> wrote:

I spoke too soon, and must be the magic of sending an email! We found what 
appears to be corrupted content and captured the binary, hoping to play it 
through the code and see what's going on.

Thanks,
Jason

On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone 
<bread...@gmail.com<mailto:bread...@gmail.com>> wrote:
Hey Mark,

We hit the issue again, and when digging into the lineage we can see the 
content is fine coming into MergeContent but is corrupt on output of Join. Any 
other suggestions?

Thanks,
Jason

On Wed, Jun 10, 2020 at 2:26 PM Mark Payne 
<marka...@hotmail.com<mailto:marka...@hotmail.com>> wrote:
Jason,

Control characters should not cause any problem with MergeContent. MergeContent 
just copies bytes from one stream to another. It’s also worth noting that 
attributes don’t really come into play here. MergeContent is combining the 
FlowFile content, so even if it has some weird attributes, those won’t cause a 
problem in the output content. NiFi stores attributes as a mapping of String to 
String key/value pairs (i.e., Map<String, String>). So the processor is 
assuming that if you want to convert a message header to an attribute, that 
header must be a string.

Content in the repository is stored using “slabs” or “blocks.” One processor at 
a time has the opportunity to write to a file in the content repository. When 
the processor finishes writing and transfers the FlowFile to the next 
processor, NiFi keeps track of which file its content was written to, the byte 
offset where its content starts, and the length of the content. The next time 
that a processor needs to write to the content of a FlowFile, it may end up 
appending to that same file on disk, but the FlowFile that the content 
corresponds to will keep track of the byte offset into the file where its 
content begins and how many bytes in that file belong to that FlowFile.

My recommendation to track this down would be to find a FlowFile that is 
corrupt, and then use the data provenance feature [1] to view its lineage. Look 
at the FlowFiles that were joined together by MergeContent and see if any of 
those is corrupt.

Thanks
-Mark

[1] http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance

On Jun 10, 2020, at 2:07 PM, Jason Iannone 
<bread...@gmail.com<mailto:bread...@gmail.com>> wrote:

Hey Mark,

I was thinking over this more and despite no complaints from Jackson 
Objectmapper is it possible that hidden and/or control characters are present 
in the JSON values which would then cause MergeContent to behave this way? I 
looked over the code and nothing jumped out, but there is something we had to 
do because of how the publisher is setting kafka header attributes. Some 
attributes are bytes and not strings converted to bytes, and ConsumeKafka seems 
to assume that these can always be converted to a String. We had to change the 
encoding to be ISO8859 due to running into issues with the bytes getting 
corrupted.

I'm also trying to better understand how the content is being stored in the 
content repository, and whether something is going wrong when writing it out.

Thanks,
Jason

On Tue, Jun 9, 2020 at 8:02 PM Mark Payne 
<marka...@hotmail.com<mailto:marka...@hotmail.com>> wrote:
Hey Jason,

Thanks for reaching out. That is definitely odd and not something that I’ve 
seen or heard about before.

Are you certain that the data is not being corrupted upstream of the processor? 
I ask because the code for the processor that handles writing out the content 
is pretty straight forward and hasn’t been modified in over 3 years, so I would 
expect to see it happen often if it were a bug in the MergeContent processor 
itself. Any chance that you can create a flow template/sample data that 
recreates the issue? Anything particularly unique about your flow?

Thanks
-Mark


> On Jun 9, 2020, at 6:47 PM, Jason Iannone 
> <bread...@gmail.com<mailto:bread...@gmail.com>> wrote:
>
> Hi all,
>
> Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. The 
> processor is being fed in many flowfiles with individual JSON records. The 
> records have various field types including a hex-encoded byte[]. We are not 
> trying to merge JSON records themselves but rather consolidate many flowfiles 
> into fewer flowfiles.
>
> What we're seeing is that a random flowfile is split causing the merge file 
> to be invalid JSON. When running multiple bins we saw the flowfile split 
> across bins.
>
> Example
> Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: "123456789" }
> Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>
> Merged Result:
> {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
> xbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
> {"name": "3", "h
>
> Mergecontent Configuration:
> Concurrent Tasks: 4
> Merge Strategy: Bin-Packing Algorithm
> Merge Format: Binary Concatenation
> Attribute Strategy: Keep Only Common Attributes
> Min. number of entries 1000
> Max number of entries: 20000
> Minimum group size: 10 KB
> Maximum number of bins: 5
> Header, Footer, and Demaractor are not set.
>
> We then backed off the below to reduce min and max entries, bin to 1, and 
> thread to 1 and still see the same issue.
>
> Any insights?
>
> Thanks,
> Jason




Reply via email to