Jason

content repo writing isnt related to zk.

you mention a fork of nifi for consume kafka.  have you tried using stock
items/albeit without whatever feature you needed so you can narrow in on
the problem?

joe

On Fri, Jul 3, 2020 at 7:16 AM Jason Iannone <bread...@gmail.com> wrote:

> What role does Zookeeper have in persistence of flowfile content, and does
> this relate to how merge content works or check of the content existing?
> From all observations it appears that merge content is picking up data that
> is either not written or not completely written to disk.
>
> Thanks,
> Jason
>
> On Tue, Jun 30, 2020 at 10:48 PM Darren Govoni <dar...@ontrenet.com>
> wrote:
>
>> Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see
>> if it detects any memory leaks or dangling unclosed file buffers/io.
>> Throwing darts but the problem could be as deep as the Linux kernel or
>> confined inside the jvm for your specific scenario.
>>
>> Sent from my Verizon, Samsung Galaxy smartphone
>> Get Outlook for Android <https://aka.ms/ghei36>
>>
>> ------------------------------
>> *From:* Jason Iannone <bread...@gmail.com>
>> *Sent:* Tuesday, June 30, 2020 10:36:02 PM
>> *To:* users@nifi.apache.org <users@nifi.apache.org>
>> *Subject:* Re: MergeContent resulting in corrupted JSON
>>
>> Previous spotting of the issue was a red herring. We removed our custom
>> code and are still facing random "org.codehaus.jackson.JsonParseException:
>> Illegal Character" during PutDatabaseRecord due to a flowfile containing
>> malformed JSON post MergeContent. Error never occurs immediately and is
>> usually once we've processed several million records. We did a NOOP run,
>> which was ConsumeKafka -> UpdateCounter and everything seemed ok.
>>
>> Here's the current form of the flow:
>>
>>    1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some
>>    containing binary data
>>       1. I have a fork of nifi with changes to allow base64 and hex
>>       encoding of select nifi headers.
>>       2. Next test will be without pulling any headers
>>    2. RouteOnAttribute - Validate attributes
>>    3. Base64EncodeContent - Content is binary, converting to a format we
>>    can store to later process
>>    4. ExtractText - Copy Base64 encoded content to attribute
>>    5. AttributesToJson - Provenance shows output as being fine
>>    6. MergeContent - Provenance shows output of malformed JSON being
>>    written in the combined flowflle.
>>    7. PutDatabaseRecord - Schema specified as Schema Text
>>
>> Since we've removed all traces of custom code what are peoples thoughts
>> on possible causes? Could this be an OS issue, or are there any known
>> issues with specific versions of RHEL?
>>
>> Logically I think it makes sense to remove JSON from the equation as a
>> whole.
>>
>> Thanks,
>> Jason
>>
>> On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <bread...@gmail.com> wrote:
>>
>> Exactly my thought, and we've been combing through the code but nothing
>> significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923,
>> NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested
>> upgrading to 1.11.4.
>>
>> Thanks,
>> Jason
>>
>> On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <marka...@hotmail.com> wrote:
>>
>> It should be okay to create a buffer like that. Assuming the FlowFile is
>> small. Typically we try to avoid buffering the content of a FlowFile into
>> memory. But if it’s a reasonable small FlowFile, that’s probably fine.
>>
>> To be honest, if the issue is intermittent and doesn’t always happen on
>> the same input, it sounds like a threading/concurrency bug. Do you have a
>> buffer or anything like that as a member variable?
>>
>> On Jun 22, 2020, at 10:02 PM, Jason Iannone <bread...@gmail.com> wrote:
>>
>> I'm now thinking its due to how we handled reading the flowfile content
>> into a buffer.
>>
>> Previous:
>> session.read(flowFile, in -> {
>>   atomicVessel.set(ByteStreams.toByteArray(in));
>> });
>>
>> Current:
>> final byte[] buffer = new byte[(int) flowFile.getSize()];
>> session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true));
>>
>> Making this change reduced the occurrences of the data corruption, but we
>> still saw it occur. What I'm now wondering is if sizing the byte array
>> based on flowFile.getSize() is ideal? The contents of the file are raw
>> bytes coming from ConsumeKafka_2_0.
>>
>> Thanks,
>> Jason
>>
>> On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <marka...@hotmail.com> wrote:
>>
>> Jason,
>>
>> Glad to hear it. This is where the data provenance becomes absolutely
>> invaluable. So now you should be able to trace the lineage of that FlowFile
>> back to the start using data provenance. You can see exactly what it looked
>> like when it was received. If it looks wrong there, the provenance shows
>> exactly where it was received from so you know where to look next. If it
>> looks good on receipt, you can trace the data through the flow and see
>> exactly what the data looked like before and after each processor. And when
>> you see which processor resulted in corruption, you can easily download the
>> data as it looks when it went into the processor to make it easy to
>> re-ingest and test.
>>
>> Thanks
>> -Mark
>>
>>
>> On Jun 22, 2020, at 4:46 PM, Jason Iannone <bread...@gmail.com> wrote:
>>
>> I spoke too soon, and must be the magic of sending an email! We found
>> what appears to be corrupted content and captured the binary, hoping to
>> play it through the code and see what's going on.
>>
>> Thanks,
>> Jason
>>
>> On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <bread...@gmail.com> wrote:
>>
>> Hey Mark,
>>
>> We hit the issue again, and when digging into the lineage we can see the
>> content is fine coming into MergeContent but is corrupt on output of Join.
>> Any other suggestions?
>>
>> Thanks,
>> Jason
>>
>> On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <marka...@hotmail.com> wrote:
>>
>> Jason,
>>
>> Control characters should not cause any problem with MergeContent.
>> MergeContent just copies bytes from one stream to another. It’s also worth
>> noting that attributes don’t really come into play here. MergeContent is
>> combining the FlowFile content, so even if it has some weird attributes,
>> those won’t cause a problem in the output content. NiFi stores attributes
>> as a mapping of String to String key/value pairs (i.e., Map<String,
>> String>). So the processor is assuming that if you want to convert a
>> message header to an attribute, that header must be a string.
>>
>> Content in the repository is stored using “slabs” or “blocks.” One
>> processor at a time has the opportunity to write to a file in the content
>> repository. When the processor finishes writing and transfers the FlowFile
>> to the next processor, NiFi keeps track of which file its content was
>> written to, the byte offset where its content starts, and the length of the
>> content. The next time that a processor needs to write to the content of a
>> FlowFile, it may end up appending to that same file on disk, but the
>> FlowFile that the content corresponds to will keep track of the byte offset
>> into the file where its content begins and how many bytes in that file
>> belong to that FlowFile.
>>
>> My recommendation to track this down would be to find a FlowFile that is
>> corrupt, and then use the data provenance feature [1] to view its lineage.
>> Look at the FlowFiles that were joined together by MergeContent and see if
>> any of those is corrupt.
>>
>> Thanks
>> -Mark
>>
>> [1]
>> http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance
>>
>> On Jun 10, 2020, at 2:07 PM, Jason Iannone <bread...@gmail.com> wrote:
>>
>> Hey Mark,
>>
>> I was thinking over this more and despite no complaints from Jackson
>> Objectmapper is it possible that hidden and/or control characters are
>> present in the JSON values which would then cause MergeContent to behave
>> this way? I looked over the code and nothing jumped out, but there is
>> something we had to do because of how the publisher is setting kafka header
>> attributes. Some attributes are bytes and not strings converted to bytes,
>> and ConsumeKafka seems to assume that these can always be converted to a
>> String. We had to change the encoding to be ISO8859 due to running into
>> issues with the bytes getting corrupted.
>>
>> I'm also trying to better understand how the content is being stored in
>> the content repository, and whether something is going wrong when writing
>> it out.
>>
>> Thanks,
>> Jason
>>
>> On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <marka...@hotmail.com> wrote:
>>
>> Hey Jason,
>>
>> Thanks for reaching out. That is definitely odd and not something that
>> I’ve seen or heard about before.
>>
>> Are you certain that the data is not being corrupted upstream of the
>> processor? I ask because the code for the processor that handles writing
>> out the content is pretty straight forward and hasn’t been modified in over
>> 3 years, so I would expect to see it happen often if it were a bug in the
>> MergeContent processor itself. Any chance that you can create a flow
>> template/sample data that recreates the issue? Anything particularly unique
>> about your flow?
>>
>> Thanks
>> -Mark
>>
>>
>> > On Jun 9, 2020, at 6:47 PM, Jason Iannone <bread...@gmail.com> wrote:
>> >
>> > Hi all,
>> >
>> > Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent.
>> The processor is being fed in many flowfiles with individual JSON records.
>> The records have various field types including a hex-encoded byte[]. We are
>> not trying to merge JSON records themselves but rather consolidate many
>> flowfiles into fewer flowfiles.
>> >
>> > What we're seeing is that a random flowfile is split causing the merge
>> file to be invalid JSON. When running multiple bins we saw the flowfile
>> split across bins.
>> >
>> > Example
>> > Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp:
>> "123456789" }
>> > Flowfile 2:  {"name": "2", "hexbytes": A10F15D14B11", timestamp:
>> "123456790" }
>> > Flowfile 3:  {"name": "3", "hexbytes": A10F15D14B11", timestamp:
>> "123456790" }
>> >
>> > Merged Result:
>> > {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" }
>> > xbytes": A10F15D14B11", timestamp: "123456790" }
>> > {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" }
>> > {"name": "3", "h
>> >
>> > Mergecontent Configuration:
>> > Concurrent Tasks: 4
>> > Merge Strategy: Bin-Packing Algorithm
>> > Merge Format: Binary Concatenation
>> > Attribute Strategy: Keep Only Common Attributes
>> > Min. number of entries 1000
>> > Max number of entries: 20000
>> > Minimum group size: 10 KB
>> > Maximum number of bins: 5
>> > Header, Footer, and Demaractor are not set.
>> >
>> > We then backed off the below to reduce min and max entries, bin to 1,
>> and thread to 1 and still see the same issue.
>> >
>> > Any insights?
>> >
>> > Thanks,
>> > Jason
>>
>>
>>
>>
>>

Reply via email to