[ 
https://issues.apache.org/jira/browse/NIFI-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15752088#comment-15752088
 ] 

Mark Payne commented on NIFI-3205:
----------------------------------

[~jackowaya] I used the DebugFlow processor to replicate the scenario that 
you're describing here. That processor really is more of a "Debug Framework 
when things go bad" processor. Allows you to configure things such as "write 
the content of the flowfile 5 times" and then "rollback the session without 
penalizing". So I did just that - set Write Iterations to 5. Everything seemed 
to work okay. But it can take a couple of minutes before the Content Repo is 
cleaned up. After that, it may still have a couple of files hanging out because 
they are still "writable." That's normal.

I then set it to rollback at the end instead of committing the session (set the 
"FlowFIle Success Iterations" property to 0 and the "FlowFile Rollback 
Iterations" property to 1).  So write 5 times and then call session.rollback(). 
Content repo went up to about 2 GB and then I stopped the processor. Waited a 
few minutes for the Content Repo to clean up, but it never did! So it looks 
like what you are describing is accurate: If the content of the FlowFile is 
written to multiple times, and then the session is rolled back, the content may 
not get cleaned up properly (I say may not because there are some conditions 
where it would and some where it wouldn't). It doesn't have a setting to remove 
the FlowFile at the end so i haven't tried that yet.

Many thanks for digging in here and keeping the conversation going with great 
details!

> Uncaught Failures Can Leave New Flow Files on Disk
> --------------------------------------------------
>
>                 Key: NIFI-3205
>                 URL: https://issues.apache.org/jira/browse/NIFI-3205
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Alan Jackoway
>
> We have been hitting a situation where our content repository quickly fills 
> the entire disk despite having archiving off and close to nothing queued.
> We believe this problem happens more often when a processor that creates many 
> flow files fails in the middle.
> I then created this test script and deployed it on a new nifi with a 100KB 
> GenerateFlowFile in front of it. The script makes 5 copies of the incoming 
> flow file, then does session.remove on those copies, then throws a 
> RuntimeException. However, the content repository grows 500KB every time it 
> runs. Then when you restart nifi, it cleans up the content repository with 
> messages like this:
> {noformat}
> 2016-12-15 11:17:29,774 INFO [main] o.a.n.c.repository.FileSystemRepository 
> Found unknown file 
> /Users/alanj/nifi-1.1.0/content_repository/1/1481818525279-1 (1126400 bytes) 
> in File System Repository; archiving file
> 2016-12-15 11:17:29,778 INFO [main] o.a.n.c.repository.FileSystemRepository 
> Found unknown file 
> /Users/alanj/nifi-1.1.0/content_repository/2/1481818585493-2 (409600 bytes) 
> in File System Repository; archiving file
> {noformat}
> The test processor is the following:
> {code:java}
> // Copyright 2016 (c) Cloudera
> package com.cloudera.edh.nifi.processors.bundles;
> import com.google.common.collect.Lists;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
> import java.util.List;
> import org.apache.nifi.annotation.behavior.InputRequirement;
> import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
> import org.apache.nifi.flowfile.FlowFile;
> import org.apache.nifi.processor.AbstractProcessor;
> import org.apache.nifi.processor.ProcessContext;
> import org.apache.nifi.processor.ProcessSession;
> import org.apache.nifi.processor.exception.ProcessException;
> import org.apache.nifi.processor.io.InputStreamCallback;
> import org.apache.nifi.processor.io.OutputStreamCallback;
> import org.apache.nifi.stream.io.StreamUtils;
> /**
>  * Makes 5 copies of an incoming file, then fails and rolls back.
>  */
> @InputRequirement(value = Requirement.INPUT_REQUIRED)
> public class CopyAndFail extends AbstractProcessor {
>   @Override
>   public void onTrigger(ProcessContext context, ProcessSession session)
>       throws ProcessException {
>     FlowFile inputFile = session.get();
>     if (inputFile == null) {
>       context.yield();
>       return;
>     }
>     final List<FlowFile> newFiles = Lists.newArrayList();
>     
>     // Copy the file 5 times (simulates us opening a zip file and unpacking 
> its contents)
>     for (int i = 0; i < 5; i++) {
>       session.read(inputFile, new InputStreamCallback() {
>         @Override
>         public void process(InputStream inputStream) throws IOException {
>           FlowFile ff = session.create(inputFile);
>           ff = session.write(ff, new OutputStreamCallback() {
>             @Override
>             public void process(final OutputStream out) throws IOException {
>               StreamUtils.copy(inputStream, out);
>             }
>           });
>           newFiles.add(ff);
>         }
>       });
>     }
>     
>     getLogger().warn("Removing the new files");
>     System.err.println("Removing the new files");
>     session.remove(newFiles);
>     
>     // Simulate an error handling some file in the zip after unpacking the 
> rest
>     throw new RuntimeException();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to