[ 
https://issues.apache.org/jira/browse/NIFI-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15754759#comment-15754759
 ] 

ASF GitHub Bot commented on NIFI-3205:
--------------------------------------

Github user jackowaya commented on the issue:

    https://github.com/apache/nifi/pull/1336
  
    Too far out of my comfort zone to give a true +1. The only thing I couldn't 
figure out is how the repoRecord object in StandardProcessSession works. It 
looks like it falls out of scope, but there must be some magic going on with 
that context call.
    
    All that to say: the tests look good to me. If they're passing, this is good


> Uncaught Failures Can Leave New Flow Files on Disk
> --------------------------------------------------
>
>                 Key: NIFI-3205
>                 URL: https://issues.apache.org/jira/browse/NIFI-3205
>             Project: Apache NiFi
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Alan Jackoway
>            Assignee: Mark Payne
>            Priority: Critical
>             Fix For: 1.2.0
>
>
> We have been hitting a situation where our content repository quickly fills 
> the entire disk despite having archiving off and close to nothing queued.
> We believe this problem happens more often when a processor that creates many 
> flow files fails in the middle.
> I then created this test script and deployed it on a new nifi with a 100KB 
> GenerateFlowFile in front of it. The script makes 5 copies of the incoming 
> flow file, then does session.remove on those copies, then throws a 
> RuntimeException. However, the content repository grows 500KB every time it 
> runs. Then when you restart nifi, it cleans up the content repository with 
> messages like this:
> {noformat}
> 2016-12-15 11:17:29,774 INFO [main] o.a.n.c.repository.FileSystemRepository 
> Found unknown file 
> /Users/alanj/nifi-1.1.0/content_repository/1/1481818525279-1 (1126400 bytes) 
> in File System Repository; archiving file
> 2016-12-15 11:17:29,778 INFO [main] o.a.n.c.repository.FileSystemRepository 
> Found unknown file 
> /Users/alanj/nifi-1.1.0/content_repository/2/1481818585493-2 (409600 bytes) 
> in File System Repository; archiving file
> {noformat}
> The test processor is the following:
> {code:java}
> // Copyright 2016 (c) Cloudera
> package com.cloudera.edh.nifi.processors.bundles;
> import com.google.common.collect.Lists;
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
> import java.util.List;
> import org.apache.nifi.annotation.behavior.InputRequirement;
> import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
> import org.apache.nifi.flowfile.FlowFile;
> import org.apache.nifi.processor.AbstractProcessor;
> import org.apache.nifi.processor.ProcessContext;
> import org.apache.nifi.processor.ProcessSession;
> import org.apache.nifi.processor.exception.ProcessException;
> import org.apache.nifi.processor.io.InputStreamCallback;
> import org.apache.nifi.processor.io.OutputStreamCallback;
> import org.apache.nifi.stream.io.StreamUtils;
> /**
>  * Makes 5 copies of an incoming file, then fails and rolls back.
>  */
> @InputRequirement(value = Requirement.INPUT_REQUIRED)
> public class CopyAndFail extends AbstractProcessor {
>   @Override
>   public void onTrigger(ProcessContext context, ProcessSession session)
>       throws ProcessException {
>     FlowFile inputFile = session.get();
>     if (inputFile == null) {
>       context.yield();
>       return;
>     }
>     final List<FlowFile> newFiles = Lists.newArrayList();
>     
>     // Copy the file 5 times (simulates us opening a zip file and unpacking 
> its contents)
>     for (int i = 0; i < 5; i++) {
>       session.read(inputFile, new InputStreamCallback() {
>         @Override
>         public void process(InputStream inputStream) throws IOException {
>           FlowFile ff = session.create(inputFile);
>           ff = session.write(ff, new OutputStreamCallback() {
>             @Override
>             public void process(final OutputStream out) throws IOException {
>               StreamUtils.copy(inputStream, out);
>             }
>           });
>           newFiles.add(ff);
>         }
>       });
>     }
>     
>     getLogger().warn("Removing the new files");
>     System.err.println("Removing the new files");
>     session.remove(newFiles);
>     
>     // Simulate an error handling some file in the zip after unpacking the 
> rest
>     throw new RuntimeException();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to