Joe,

How we managed to write a FlowFile repository that we later couldn’t load back 
into heap is confusing, but we did it somehow (and we even increased heap from 
300GB, what it was set to when we created the repo, to 500GB’s)… A user had 
some large JSON blocks stored in an attribute. They then did some versioning on 
this attribute and duplicated the value into other attribute names. They loaded 
~60k FlowFile’s into the node over about 90 seconds, all with large JSON 
attributes.  When the 2 minute checkpoint tried to run, it ran out of heap.

Another thing we learned is that you need to have 2x the disk capacity for the 
FlowFile Repository. Because when NiFi goes to checkpoint your data it creates 
a fully merged duplicate, and both the original and new checkpoints exist on 
disk at the same time for a little while.

It was a little frustrating that missing overflow files stopped the journal 
from loading.  To me, it felt like this should work more like a FlowFile that 
is missing it’s queue, or content in the content repository missing it’s 
FlowFile.  Instead of failing to load the journal, just keep loading the 
journal even if the overflow is missing. I’ll look at making a Jira for this.

One cool thing that came out of this was writing scripts to raw read/modify the 
overflow files, and I’m sure with some more work could be extended to a full 
FlowFile repository offline cleanup utility.

Example of loading a Journal:

        StandardResourceClaimManager resourceClaimManager = new 
StandardResourceClaimManager();
        DataInputStream dataIn = new DataInputStream(new FileInputStream(new 
File("/home/nifi/569892338.journal")));

        // Read the header/serialization details
        final String waliImplementationClass = dataIn.readUTF();
        final int waliImplementationVersion = dataIn.readInt();
        final String serdeEncoding = dataIn.readUTF(); // ignore serde class 
name for now
        final int serdeVersion = dataIn.readInt();
        final int serdeHeaderLength = dataIn.readInt();

        SchemaRepositoryRecordSerde serde = new 
SchemaRepositoryRecordSerde(resourceClaimManager){
            @Override
            protected FlowFileQueue getFlowFileQueue(String queueId) {
                return new FlowFileQueue() {
                    @Override
                    public String getIdentifier() {
                        return queueId;

                    }

…. All other override methods not used ….
                };
            }
        };

        final InputStream serdeHeaderIn = new LimitingInputStream(dataIn, 
serdeHeaderLength);
        final DataInputStream dis = new DataInputStream(serdeHeaderIn);

        serde.readHeader(dis);


At this point you can either read the journal (might need a few more 
dataIn.reads), or as I did, you can go directly to reading overflow files.  
Here is one that tells you which queues are referenced and by how many 
FlowFile’s.
Overflow file’s don’t have a header/schema like the journals do, so you have to 
load them from the journal, and then use the same serde to read the overflow 
files.

    private Map<String, Integer> countQueues(String file, 
SchemaRepositoryRecordSerde serde, int serdeVersion) throws IOException {
        Map<String, Integer> counts = new HashMap<>();
        DataInputStream dataInputStream = new DataInputStream(new 
FileInputStream(new File(file)));

        RepositoryRecord rr = serde.deserializeRecord(dataInputStream, 
serdeVersion);
        while(rr != null){
            final String queue = rr.getOriginalQueue().getIdentifier();
            if(counts.containsKey(queue)) {
                counts.put(queue, counts.get(queue) + 1);
            } else {
                counts.put(queue, 1);
            }

            rr = serde.deserializeRecord(dataInputStream, serdeVersion);
        }

        dataInputStream.close();
        return counts;
    }

Here is one where I selectively remove FlowFile’s from existing overflow files 
based on their attributes/values and write a new file out:

    private void removeQueuedFiles(String file, String newFile,
                                   List<String> queues,
                                   SchemaRepositoryRecordSerde serde, int 
serdeVersion) throws IOException {
        DataInputStream dataInputStream = new DataInputStream(new 
FileInputStream(new File(file)));

        final DataOutputStream outStream = new DataOutputStream(new 
FileOutputStream(new File(newFile)));

        int total=0;
        int saved=0;
        RepositoryRecord rr = serde.deserializeRecord(dataInputStream, 
serdeVersion);

        while(rr != null){
            total++;
            Map<String, String> attrs = rr.getCurrent().getAttributes();
            if(!(attrs.containsKey("attributename")
                    && attrs.get("attributename ").contains("searchterm"))) {
                serde.serializeRecord(rr, outStream);
                saved++;
            }

            rr = serde.deserializeRecord(dataInputStream, serdeVersion);
        }

        outStream.close();
        dataInputStream.close();

        System.out.println(file + " - " + saved + " / " + total);
    }

From: Joe Witt <joe.w...@gmail.com>
Sent: Thursday, August 15, 2019 10:58 AM
To: users@nifi.apache.org
Subject: Re: [EXT] Re: FlowFile Repository can't checkpoint, out of heap space.

Peter

All the details you can share on this would be good.  First, we should be 
resilient to any sort of repo corruption in the event of heap issues.  While 
obviously the flow isn't in a good state at that point the saved state should 
be reliable/recoverable.  Second, how the repo/journals got that large itself 
should be evaluated/considered/determined.  A full JIRA/description of the 
situation/logs/known state would be worthy of further resolution.

Thanks

On Thu, Aug 15, 2019 at 12:50 PM Peter Wicks (pwicks) 
<pwi...@micron.com<mailto:pwi...@micron.com>> wrote:
We were able to recover this morning, in the end we deleted the queues that 
were causing trouble from the Flow, and when the problem node came online it 
deleted the FlowFile’s all on its own, since the queue did not exist. Since 
this is done during the FlowFile Repository load into memory, it didn’t run out 
of heap.

But before we go to that point we maxed out heap, 500GB’s!  All our server had 
to offer. I also tried scripting a cleanup of the journals overflow files. 
Which failed, because the journal keeps track of those files, and won’t restore 
if some are missing.  I’m thinking of building some nifi-utility functions for 
doing emergency cleanup of the FlowFile repository where you can specify a 
Queue ID and it removes those files, or maybe doing an offline compaction.

Thanks,
  Peter


From: Brandon DeVries <b...@jhu.edu<mailto:b...@jhu.edu>>
Sent: Thursday, August 15, 2019 9:53 AM
To: users@nifi.apache.org<mailto:users@nifi.apache.org>
Subject: [EXT] Re: FlowFile Repository can't checkpoint, out of heap space.


Peter,

Unfortunately, I don't have a perfect solution for your current problem.  I 
would try starting with autoResume=false, just to try to limit what's going on 
in the system.  If possible, you can also try temporarily giving the JVM more 
heap.

This is, however, the use case that led to the idea of "recovery mode" in the 
new RocksDBFlowFileRepository[1] that should be in nifi 1.10.0 (the 
documentation[2] is attached to the ticket):

"[Recovery mode] limits the number of FlowFiles loaded into the graph at a 
time, while not actually removing any FlowFiles (or content) from the system. 
This allows for the recovery of a system that is encountering OutOfMemory 
errors or similar on startup..."

[1] 
https://issues.apache.org/jira/browse/NIFI-4775<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FNIFI-4775&data=02%7C01%7Cpwicks%40micron.com%7Ce785de36aeeb49a54bf308d721a1d839%7Cf38a5ecd28134862b11bac1d563c806f%7C0%7C0%7C637014851336060085&sdata=BT%2FQoS0CeWySXE5VIJblhE%2BLaXW7ziR1rcfUlRQdnBc%3D&reserved=0>
[2] 
https://issues.apache.org/jira/secure/attachment/12976954/RocksDBFlowFileRepo.html<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fsecure%2Fattachment%2F12976954%2FRocksDBFlowFileRepo.html&data=02%7C01%7Cpwicks%40micron.com%7Ce785de36aeeb49a54bf308d721a1d839%7Cf38a5ecd28134862b11bac1d563c806f%7C0%7C0%7C637014851336070081&sdata=GMs8yj24VVSL0Igk8wYkYvLlx0i6wtXVI8xRU3VsL0Y%3D&reserved=0>

On Wed, Aug 14, 2019 at 12:12 PM Peter Wicks (pwicks) 
<pwi...@micron.com<mailto:pwi...@micron.com>> wrote:
I have a node in a cluster whose FlowFile repository grew so fast that it 
exceeded the amount of available heap space and now can't checkpoint. Or that 
is my interpretation of the error.

"Cannot update journal file flowfile_repository/journals/####.journal because 
this journal  has already encountered a failure when attempting to write to the 
file."
Additionally, on restart, we see NiFi failed to restart because it ran out of 
heap space while doing a SchemaRecordReader.readFieldValue.  Feeling a bit 
stuck on where to go from here.

Based on metrics we collect, we see a large increase in FlowFile's on that node 
right before it crashed, and in linux we see the following:
94G     ./journals/overflow-569618072
356G    ./journals/overflow-569892338

Oh, and a 280 GB checkpoint file

There are a few queues/known FlowFile’s that are probably the problem, and I’m 
OK with dropping them, but there is plenty of other data in there too that I 
don’t want to lose…

Thanks,
  Peter

Reply via email to