[ https://issues.apache.org/jira/browse/NIFI-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15754736#comment-15754736 ]
Alan Jackoway commented on NIFI-3205: ------------------------------------- I was misunderstanding. I am calling session.write multiple times on the same flow file, which produces several output flow files. I was thinking of it from the output side, not the write side. I think the PR handles it, though it's a section of the code I'm not familiar with. > Uncaught Failures Can Leave New Flow Files on Disk > -------------------------------------------------- > > Key: NIFI-3205 > URL: https://issues.apache.org/jira/browse/NIFI-3205 > Project: Apache NiFi > Issue Type: Bug > Affects Versions: 1.1.0 > Reporter: Alan Jackoway > Assignee: Mark Payne > Priority: Critical > Fix For: 1.2.0 > > > We have been hitting a situation where our content repository quickly fills > the entire disk despite having archiving off and close to nothing queued. > We believe this problem happens more often when a processor that creates many > flow files fails in the middle. > I then created this test script and deployed it on a new nifi with a 100KB > GenerateFlowFile in front of it. The script makes 5 copies of the incoming > flow file, then does session.remove on those copies, then throws a > RuntimeException. However, the content repository grows 500KB every time it > runs. Then when you restart nifi, it cleans up the content repository with > messages like this: > {noformat} > 2016-12-15 11:17:29,774 INFO [main] o.a.n.c.repository.FileSystemRepository > Found unknown file > /Users/alanj/nifi-1.1.0/content_repository/1/1481818525279-1 (1126400 bytes) > in File System Repository; archiving file > 2016-12-15 11:17:29,778 INFO [main] o.a.n.c.repository.FileSystemRepository > Found unknown file > /Users/alanj/nifi-1.1.0/content_repository/2/1481818585493-2 (409600 bytes) > in File System Repository; archiving file > {noformat} > The test processor is the following: > {code:java} > // Copyright 2016 (c) Cloudera > package com.cloudera.edh.nifi.processors.bundles; > import com.google.common.collect.Lists; > import java.io.IOException; > import java.io.InputStream; > import java.io.OutputStream; > import java.util.List; > import org.apache.nifi.annotation.behavior.InputRequirement; > import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; > import org.apache.nifi.flowfile.FlowFile; > import org.apache.nifi.processor.AbstractProcessor; > import org.apache.nifi.processor.ProcessContext; > import org.apache.nifi.processor.ProcessSession; > import org.apache.nifi.processor.exception.ProcessException; > import org.apache.nifi.processor.io.InputStreamCallback; > import org.apache.nifi.processor.io.OutputStreamCallback; > import org.apache.nifi.stream.io.StreamUtils; > /** > * Makes 5 copies of an incoming file, then fails and rolls back. > */ > @InputRequirement(value = Requirement.INPUT_REQUIRED) > public class CopyAndFail extends AbstractProcessor { > @Override > public void onTrigger(ProcessContext context, ProcessSession session) > throws ProcessException { > FlowFile inputFile = session.get(); > if (inputFile == null) { > context.yield(); > return; > } > final List<FlowFile> newFiles = Lists.newArrayList(); > > // Copy the file 5 times (simulates us opening a zip file and unpacking > its contents) > for (int i = 0; i < 5; i++) { > session.read(inputFile, new InputStreamCallback() { > @Override > public void process(InputStream inputStream) throws IOException { > FlowFile ff = session.create(inputFile); > ff = session.write(ff, new OutputStreamCallback() { > @Override > public void process(final OutputStream out) throws IOException { > StreamUtils.copy(inputStream, out); > } > }); > newFiles.add(ff); > } > }); > } > > getLogger().warn("Removing the new files"); > System.err.println("Removing the new files"); > session.remove(newFiles); > > // Simulate an error handling some file in the zip after unpacking the > rest > throw new RuntimeException(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)