Joe, How we managed to write a FlowFile repository that we later couldn’t load back into heap is confusing, but we did it somehow (and we even increased heap from 300GB, what it was set to when we created the repo, to 500GB’s)… A user had some large JSON blocks stored in an attribute. They then did some versioning on this attribute and duplicated the value into other attribute names. They loaded ~60k FlowFile’s into the node over about 90 seconds, all with large JSON attributes. When the 2 minute checkpoint tried to run, it ran out of heap.
Another thing we learned is that you need to have 2x the disk capacity for the FlowFile Repository. Because when NiFi goes to checkpoint your data it creates a fully merged duplicate, and both the original and new checkpoints exist on disk at the same time for a little while. It was a little frustrating that missing overflow files stopped the journal from loading. To me, it felt like this should work more like a FlowFile that is missing it’s queue, or content in the content repository missing it’s FlowFile. Instead of failing to load the journal, just keep loading the journal even if the overflow is missing. I’ll look at making a Jira for this. One cool thing that came out of this was writing scripts to raw read/modify the overflow files, and I’m sure with some more work could be extended to a full FlowFile repository offline cleanup utility. Example of loading a Journal: StandardResourceClaimManager resourceClaimManager = new StandardResourceClaimManager(); DataInputStream dataIn = new DataInputStream(new FileInputStream(new File("/home/nifi/569892338.journal"))); // Read the header/serialization details final String waliImplementationClass = dataIn.readUTF(); final int waliImplementationVersion = dataIn.readInt(); final String serdeEncoding = dataIn.readUTF(); // ignore serde class name for now final int serdeVersion = dataIn.readInt(); final int serdeHeaderLength = dataIn.readInt(); SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager){ @Override protected FlowFileQueue getFlowFileQueue(String queueId) { return new FlowFileQueue() { @Override public String getIdentifier() { return queueId; } …. All other override methods not used …. }; } }; final InputStream serdeHeaderIn = new LimitingInputStream(dataIn, serdeHeaderLength); final DataInputStream dis = new DataInputStream(serdeHeaderIn); serde.readHeader(dis); At this point you can either read the journal (might need a few more dataIn.reads), or as I did, you can go directly to reading overflow files. Here is one that tells you which queues are referenced and by how many FlowFile’s. Overflow file’s don’t have a header/schema like the journals do, so you have to load them from the journal, and then use the same serde to read the overflow files. private Map<String, Integer> countQueues(String file, SchemaRepositoryRecordSerde serde, int serdeVersion) throws IOException { Map<String, Integer> counts = new HashMap<>(); DataInputStream dataInputStream = new DataInputStream(new FileInputStream(new File(file))); RepositoryRecord rr = serde.deserializeRecord(dataInputStream, serdeVersion); while(rr != null){ final String queue = rr.getOriginalQueue().getIdentifier(); if(counts.containsKey(queue)) { counts.put(queue, counts.get(queue) + 1); } else { counts.put(queue, 1); } rr = serde.deserializeRecord(dataInputStream, serdeVersion); } dataInputStream.close(); return counts; } Here is one where I selectively remove FlowFile’s from existing overflow files based on their attributes/values and write a new file out: private void removeQueuedFiles(String file, String newFile, List<String> queues, SchemaRepositoryRecordSerde serde, int serdeVersion) throws IOException { DataInputStream dataInputStream = new DataInputStream(new FileInputStream(new File(file))); final DataOutputStream outStream = new DataOutputStream(new FileOutputStream(new File(newFile))); int total=0; int saved=0; RepositoryRecord rr = serde.deserializeRecord(dataInputStream, serdeVersion); while(rr != null){ total++; Map<String, String> attrs = rr.getCurrent().getAttributes(); if(!(attrs.containsKey("attributename") && attrs.get("attributename ").contains("searchterm"))) { serde.serializeRecord(rr, outStream); saved++; } rr = serde.deserializeRecord(dataInputStream, serdeVersion); } outStream.close(); dataInputStream.close(); System.out.println(file + " - " + saved + " / " + total); } From: Joe Witt <joe.w...@gmail.com> Sent: Thursday, August 15, 2019 10:58 AM To: users@nifi.apache.org Subject: Re: [EXT] Re: FlowFile Repository can't checkpoint, out of heap space. Peter All the details you can share on this would be good. First, we should be resilient to any sort of repo corruption in the event of heap issues. While obviously the flow isn't in a good state at that point the saved state should be reliable/recoverable. Second, how the repo/journals got that large itself should be evaluated/considered/determined. A full JIRA/description of the situation/logs/known state would be worthy of further resolution. Thanks On Thu, Aug 15, 2019 at 12:50 PM Peter Wicks (pwicks) <pwi...@micron.com<mailto:pwi...@micron.com>> wrote: We were able to recover this morning, in the end we deleted the queues that were causing trouble from the Flow, and when the problem node came online it deleted the FlowFile’s all on its own, since the queue did not exist. Since this is done during the FlowFile Repository load into memory, it didn’t run out of heap. But before we go to that point we maxed out heap, 500GB’s! All our server had to offer. I also tried scripting a cleanup of the journals overflow files. Which failed, because the journal keeps track of those files, and won’t restore if some are missing. I’m thinking of building some nifi-utility functions for doing emergency cleanup of the FlowFile repository where you can specify a Queue ID and it removes those files, or maybe doing an offline compaction. Thanks, Peter From: Brandon DeVries <b...@jhu.edu<mailto:b...@jhu.edu>> Sent: Thursday, August 15, 2019 9:53 AM To: users@nifi.apache.org<mailto:users@nifi.apache.org> Subject: [EXT] Re: FlowFile Repository can't checkpoint, out of heap space. Peter, Unfortunately, I don't have a perfect solution for your current problem. I would try starting with autoResume=false, just to try to limit what's going on in the system. If possible, you can also try temporarily giving the JVM more heap. This is, however, the use case that led to the idea of "recovery mode" in the new RocksDBFlowFileRepository[1] that should be in nifi 1.10.0 (the documentation[2] is attached to the ticket): "[Recovery mode] limits the number of FlowFiles loaded into the graph at a time, while not actually removing any FlowFiles (or content) from the system. This allows for the recovery of a system that is encountering OutOfMemory errors or similar on startup..." [1] https://issues.apache.org/jira/browse/NIFI-4775<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FNIFI-4775&data=02%7C01%7Cpwicks%40micron.com%7Ce785de36aeeb49a54bf308d721a1d839%7Cf38a5ecd28134862b11bac1d563c806f%7C0%7C0%7C637014851336060085&sdata=BT%2FQoS0CeWySXE5VIJblhE%2BLaXW7ziR1rcfUlRQdnBc%3D&reserved=0> [2] https://issues.apache.org/jira/secure/attachment/12976954/RocksDBFlowFileRepo.html<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fsecure%2Fattachment%2F12976954%2FRocksDBFlowFileRepo.html&data=02%7C01%7Cpwicks%40micron.com%7Ce785de36aeeb49a54bf308d721a1d839%7Cf38a5ecd28134862b11bac1d563c806f%7C0%7C0%7C637014851336070081&sdata=GMs8yj24VVSL0Igk8wYkYvLlx0i6wtXVI8xRU3VsL0Y%3D&reserved=0> On Wed, Aug 14, 2019 at 12:12 PM Peter Wicks (pwicks) <pwi...@micron.com<mailto:pwi...@micron.com>> wrote: I have a node in a cluster whose FlowFile repository grew so fast that it exceeded the amount of available heap space and now can't checkpoint. Or that is my interpretation of the error. "Cannot update journal file flowfile_repository/journals/####.journal because this journal has already encountered a failure when attempting to write to the file." Additionally, on restart, we see NiFi failed to restart because it ran out of heap space while doing a SchemaRecordReader.readFieldValue. Feeling a bit stuck on where to go from here. Based on metrics we collect, we see a large increase in FlowFile's on that node right before it crashed, and in linux we see the following: 94G ./journals/overflow-569618072 356G ./journals/overflow-569892338 Oh, and a 280 GB checkpoint file There are a few queues/known FlowFile’s that are probably the problem, and I’m OK with dropping them, but there is plenty of other data in there too that I don’t want to lose… Thanks, Peter