Jason content repo writing isnt related to zk.
you mention a fork of nifi for consume kafka. have you tried using stock items/albeit without whatever feature you needed so you can narrow in on the problem? joe On Fri, Jul 3, 2020 at 7:16 AM Jason Iannone <bread...@gmail.com> wrote: > What role does Zookeeper have in persistence of flowfile content, and does > this relate to how merge content works or check of the content existing? > From all observations it appears that merge content is picking up data that > is either not written or not completely written to disk. > > Thanks, > Jason > > On Tue, Jun 30, 2020 at 10:48 PM Darren Govoni <dar...@ontrenet.com> > wrote: > >> Run the nifi jvm in a runtime profiler/analyzer like appdynamics and see >> if it detects any memory leaks or dangling unclosed file buffers/io. >> Throwing darts but the problem could be as deep as the Linux kernel or >> confined inside the jvm for your specific scenario. >> >> Sent from my Verizon, Samsung Galaxy smartphone >> Get Outlook for Android <https://aka.ms/ghei36> >> >> ------------------------------ >> *From:* Jason Iannone <bread...@gmail.com> >> *Sent:* Tuesday, June 30, 2020 10:36:02 PM >> *To:* users@nifi.apache.org <users@nifi.apache.org> >> *Subject:* Re: MergeContent resulting in corrupted JSON >> >> Previous spotting of the issue was a red herring. We removed our custom >> code and are still facing random "org.codehaus.jackson.JsonParseException: >> Illegal Character" during PutDatabaseRecord due to a flowfile containing >> malformed JSON post MergeContent. Error never occurs immediately and is >> usually once we've processed several million records. We did a NOOP run, >> which was ConsumeKafka -> UpdateCounter and everything seemed ok. >> >> Here's the current form of the flow: >> >> 1. ConsumeKafka_2_0 - Encoding headers as ISO-8859-1 due to some >> containing binary data >> 1. I have a fork of nifi with changes to allow base64 and hex >> encoding of select nifi headers. >> 2. Next test will be without pulling any headers >> 2. RouteOnAttribute - Validate attributes >> 3. Base64EncodeContent - Content is binary, converting to a format we >> can store to later process >> 4. ExtractText - Copy Base64 encoded content to attribute >> 5. AttributesToJson - Provenance shows output as being fine >> 6. MergeContent - Provenance shows output of malformed JSON being >> written in the combined flowflle. >> 7. PutDatabaseRecord - Schema specified as Schema Text >> >> Since we've removed all traces of custom code what are peoples thoughts >> on possible causes? Could this be an OS issue, or are there any known >> issues with specific versions of RHEL? >> >> Logically I think it makes sense to remove JSON from the equation as a >> whole. >> >> Thanks, >> Jason >> >> On Wed, Jun 24, 2020 at 2:54 PM Jason Iannone <bread...@gmail.com> wrote: >> >> Exactly my thought, and we've been combing through the code but nothing >> significant has jumped out. Something that does are Nifi JIRA's, NIFI-6923, >> NIFI-6924, and NIFI-6846. Considering we're on 1.10.0 I've requested >> upgrading to 1.11.4. >> >> Thanks, >> Jason >> >> On Tue, Jun 23, 2020 at 9:05 AM Mark Payne <marka...@hotmail.com> wrote: >> >> It should be okay to create a buffer like that. Assuming the FlowFile is >> small. Typically we try to avoid buffering the content of a FlowFile into >> memory. But if it’s a reasonable small FlowFile, that’s probably fine. >> >> To be honest, if the issue is intermittent and doesn’t always happen on >> the same input, it sounds like a threading/concurrency bug. Do you have a >> buffer or anything like that as a member variable? >> >> On Jun 22, 2020, at 10:02 PM, Jason Iannone <bread...@gmail.com> wrote: >> >> I'm now thinking its due to how we handled reading the flowfile content >> into a buffer. >> >> Previous: >> session.read(flowFile, in -> { >> atomicVessel.set(ByteStreams.toByteArray(in)); >> }); >> >> Current: >> final byte[] buffer = new byte[(int) flowFile.getSize()]; >> session.read(flowFile, in -> StreamUtils.fillBuffer(in, buffer, true)); >> >> Making this change reduced the occurrences of the data corruption, but we >> still saw it occur. What I'm now wondering is if sizing the byte array >> based on flowFile.getSize() is ideal? The contents of the file are raw >> bytes coming from ConsumeKafka_2_0. >> >> Thanks, >> Jason >> >> On Mon, Jun 22, 2020 at 4:51 PM Mark Payne <marka...@hotmail.com> wrote: >> >> Jason, >> >> Glad to hear it. This is where the data provenance becomes absolutely >> invaluable. So now you should be able to trace the lineage of that FlowFile >> back to the start using data provenance. You can see exactly what it looked >> like when it was received. If it looks wrong there, the provenance shows >> exactly where it was received from so you know where to look next. If it >> looks good on receipt, you can trace the data through the flow and see >> exactly what the data looked like before and after each processor. And when >> you see which processor resulted in corruption, you can easily download the >> data as it looks when it went into the processor to make it easy to >> re-ingest and test. >> >> Thanks >> -Mark >> >> >> On Jun 22, 2020, at 4:46 PM, Jason Iannone <bread...@gmail.com> wrote: >> >> I spoke too soon, and must be the magic of sending an email! We found >> what appears to be corrupted content and captured the binary, hoping to >> play it through the code and see what's going on. >> >> Thanks, >> Jason >> >> On Mon, Jun 22, 2020 at 4:35 PM Jason Iannone <bread...@gmail.com> wrote: >> >> Hey Mark, >> >> We hit the issue again, and when digging into the lineage we can see the >> content is fine coming into MergeContent but is corrupt on output of Join. >> Any other suggestions? >> >> Thanks, >> Jason >> >> On Wed, Jun 10, 2020 at 2:26 PM Mark Payne <marka...@hotmail.com> wrote: >> >> Jason, >> >> Control characters should not cause any problem with MergeContent. >> MergeContent just copies bytes from one stream to another. It’s also worth >> noting that attributes don’t really come into play here. MergeContent is >> combining the FlowFile content, so even if it has some weird attributes, >> those won’t cause a problem in the output content. NiFi stores attributes >> as a mapping of String to String key/value pairs (i.e., Map<String, >> String>). So the processor is assuming that if you want to convert a >> message header to an attribute, that header must be a string. >> >> Content in the repository is stored using “slabs” or “blocks.” One >> processor at a time has the opportunity to write to a file in the content >> repository. When the processor finishes writing and transfers the FlowFile >> to the next processor, NiFi keeps track of which file its content was >> written to, the byte offset where its content starts, and the length of the >> content. The next time that a processor needs to write to the content of a >> FlowFile, it may end up appending to that same file on disk, but the >> FlowFile that the content corresponds to will keep track of the byte offset >> into the file where its content begins and how many bytes in that file >> belong to that FlowFile. >> >> My recommendation to track this down would be to find a FlowFile that is >> corrupt, and then use the data provenance feature [1] to view its lineage. >> Look at the FlowFiles that were joined together by MergeContent and see if >> any of those is corrupt. >> >> Thanks >> -Mark >> >> [1] >> http://nifi.apache.org/docs/nifi-docs/html/user-guide.html#data_provenance >> >> On Jun 10, 2020, at 2:07 PM, Jason Iannone <bread...@gmail.com> wrote: >> >> Hey Mark, >> >> I was thinking over this more and despite no complaints from Jackson >> Objectmapper is it possible that hidden and/or control characters are >> present in the JSON values which would then cause MergeContent to behave >> this way? I looked over the code and nothing jumped out, but there is >> something we had to do because of how the publisher is setting kafka header >> attributes. Some attributes are bytes and not strings converted to bytes, >> and ConsumeKafka seems to assume that these can always be converted to a >> String. We had to change the encoding to be ISO8859 due to running into >> issues with the bytes getting corrupted. >> >> I'm also trying to better understand how the content is being stored in >> the content repository, and whether something is going wrong when writing >> it out. >> >> Thanks, >> Jason >> >> On Tue, Jun 9, 2020 at 8:02 PM Mark Payne <marka...@hotmail.com> wrote: >> >> Hey Jason, >> >> Thanks for reaching out. That is definitely odd and not something that >> I’ve seen or heard about before. >> >> Are you certain that the data is not being corrupted upstream of the >> processor? I ask because the code for the processor that handles writing >> out the content is pretty straight forward and hasn’t been modified in over >> 3 years, so I would expect to see it happen often if it were a bug in the >> MergeContent processor itself. Any chance that you can create a flow >> template/sample data that recreates the issue? Anything particularly unique >> about your flow? >> >> Thanks >> -Mark >> >> >> > On Jun 9, 2020, at 6:47 PM, Jason Iannone <bread...@gmail.com> wrote: >> > >> > Hi all, >> > >> > Within Nifi 1.10.0 we're seeing unexpected behavior with mergecontent. >> The processor is being fed in many flowfiles with individual JSON records. >> The records have various field types including a hex-encoded byte[]. We are >> not trying to merge JSON records themselves but rather consolidate many >> flowfiles into fewer flowfiles. >> > >> > What we're seeing is that a random flowfile is split causing the merge >> file to be invalid JSON. When running multiple bins we saw the flowfile >> split across bins. >> > >> > Example >> > Flowfile 1: {"name": "1", "hexbytes": A10F15B11D14", timestamp: >> "123456789" } >> > Flowfile 2: {"name": "2", "hexbytes": A10F15D14B11", timestamp: >> "123456790" } >> > Flowfile 3: {"name": "3", "hexbytes": A10F15D14B11", timestamp: >> "123456790" } >> > >> > Merged Result: >> > {"name": "1", "hexbyters": A10F15B11D14", timestamp: "123456789" } >> > xbytes": A10F15D14B11", timestamp: "123456790" } >> > {"name": "3", "hexbytes": A10F15D14B11", timestamp: "123456790" } >> > {"name": "3", "h >> > >> > Mergecontent Configuration: >> > Concurrent Tasks: 4 >> > Merge Strategy: Bin-Packing Algorithm >> > Merge Format: Binary Concatenation >> > Attribute Strategy: Keep Only Common Attributes >> > Min. number of entries 1000 >> > Max number of entries: 20000 >> > Minimum group size: 10 KB >> > Maximum number of bins: 5 >> > Header, Footer, and Demaractor are not set. >> > >> > We then backed off the below to reduce min and max entries, bin to 1, >> and thread to 1 and still see the same issue. >> > >> > Any insights? >> > >> > Thanks, >> > Jason >> >> >> >> >>