Martin,

I agree with Andrew on the point of a single virtual core not being much. I've 
not really
dealt with Google Compute Cloud personally but on the AWS t1.micro instances, 
which
offer a similar VM, I don't expect much out of it.

That being said, let's look a bit deeper at some of the performance 
considerations that we
should take into account.

Given that you have < 4 GB RAM and your CSV file is 2.5 GB uncompressed,
it is quite possible that your operating system's disk cache is not benefitting 
you
much. In such a case, you'd be hitting the disks quite a lot, just to read the 
data. So adding
some more RAM would likely help there as well. Quite often, with a pretty 
reasonable amount
of RAM, NiFi will never (or almost never) have to read FlowFile content from 
disk because the
disk cache will have it buffered for us. This makes a huge difference in 
performance.

You're also splitting each incoming FlowFile into *LOTS* of tiny FlowFiles. 
This has quite a bit
of overhead. Each FlowFile has to be tracked in the FlowFile Repository, and 
everything that happens
gets recorded in the Provenance Repository. This can be very expensive for 
large numbers of FlowFiles.
RouteOnContent could probably also be looked into, as I think it may be doing 
some not-so-efficient
things when scanning content (namely, it is not buffering the data itself and 
is instead using a BufferedInputStream
to read one byte at a time, and this is very expensive).

The good news is that you can probably simplify your flow and significantly 
improve NiFi's performance without
a tremendous amount of effort :)

Because you are using CSV data, rather than SplitText -> SplitText -> 
RouteOnContent -> MergeContent
you can probably replace all of this with a single RouteText processor.
The RouteText processor allows you to use Expression Language to route each 
line of text individually. So, if for
example, you have CSV coming in that looks like this:

id, name, dob, gender
1, John Doe, 11/02/1992, M
2, Jane Doe, 11/02/1993, F
3, Jacob Doe, 10/02/2014, M
4, Janine Doe, 02/05/2012, F

You could add a new property named 'boys' with a value of 
${line:substringAfterLast(', '):equals('M')}
and another property named 'girls' with a value of ${line:substringAfterLast(', 
'):equals('F')}

This will take this one incoming FlowFile and output two FlowFiles: one to the 
'boys' relationship that has
the line for John Doe and Jacob Doe, and one FlowFile to the 'girls' 
relationship that has the line for
Jane Doe and Janine Doe. So this means that the FlowFiles never have to be 
split up and merged again.
It also bypasses the RouteOnContent processor, which appears to be your 
bottleneck.

Beyond this, there are a few processors on the 'master' branch right now, which 
I presume will be released in version
1.3.0 of NiFi, that may help to make this easier, as well. Namely, the 
PartitionRecord processor. Version 1.2.0 of NiFi
introduced the notion of Record Readers and Record Writers. There is a CSV 
Reader and a CSV Writer. So you could make
use of the PartitionRecord processor, as well. For example, you could configure 
PartitionRecord with a property named
'gender' and a value of '/gender' and this will automatically group together 
CSV records that have the same value for the
'gender' column and will also add an Attribute named 'gender' to the FlowFile 
that has a value of either 'M' or 'F' in this case.
So you can use RouteOnAttribute afterward to route as appropriate. The 
Processor has a good bit of documentation and
examples, if that's a route that you're interested in taking.

I hope this helps!

-Mark




> On Jun 2, 2017, at 8:22 AM, Andrew Grande <apere...@gmail.com> wrote:
> 
> 1 vcore, which is not even a full core (a shared and oversubscribed cpu
> core). I'm not sure what you expected to see when you raised concurrency to
> 10 :)
> 
> There's a lot of things NiFi is doing behind the scenes, especially around
> provenance recording. I don't recommend anything below 4 cores to have
> meaningful experience​. If in a cloud, go to 8 cores per VM, unless you are
> designing for a low footprint with MiNiFi.
> 
> Andrew
> 
> On Fri, Jun 2, 2017, 6:30 AM Martin Eden <martineden...@gmail.com> wrote:
> 
>> Thanks Andrew,
>> 
>> I have added UpdateAttribute processors to update the file names like you
>> said. Now it works, writing out 1MB files at a time (updated the
>> MergeContent MaxNumberOfEntries to 10000 to achieve that since each line in
>> my csv is 100 bytes).
>> 
>> The current flow is:
>> ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
>> -> RouteOnContent -> MergeContent -> UpdateAttribute -> PutHDFS
>> 
>> 
>>    -> MergeContent -> UpdateAttribute -> PutHDFS
>> 
>> 
>>    -> MergeContent -> UpdateAttribute -> PutHDFS
>> 
>> So now let's talk performance.
>> 
>> With a 1 node NiFi running on a Google Compute Engine instance with 1 core
>> and 3.7 GB RAM and a 20GB disk, when I feed one 300MB zip file
>> (uncompressed 2.5GB csv text) to this flow it is basically never finishing
>> the job of transferring all the data.
>> 
>> The inbound queue of RouteOnContent is always red and outbound queues are
>> mostly green so that indicates that this processor is the bottleneck. To
>> mitigate this I increased its number of concurrent tasks to 10 and then
>> observed tasks in progress 10, outbound queues temporarily red, avg task
>> latency increased from 2ms to 20ms, cpu on box maxed out to 100% by the
>> NiFi java process, load avg 5.
>> 
>> I then decreased the number of concurrent tasks of RouteOnContent to 5 and
>> the task average time dropped to about half as expected, with cpu still
>> 100% taken by the NiFi java process.
>> 
>> The RouteOnContent has 3 simple regexes that it applies.
>> 
>> Questions:
>> 
>> 1. Is it safe to say that I maxed out the performance of this flow on one
>> box with 1 core and 3.8 GB ram?
>> 
>> 2. The performance seems a lot lower than expected though which is
>> worrying. Is this expected? I am planning to do this at much larger scale,
>> hundreds of GBs.
>> 
>> 3. Is the RouteOnContent that I am using hitting NiFi hard? Is this not a
>> recommended use case? Is there anything obviously wrong in my flow?
>> Doing a bit of digging around in docs, presentations and other people's
>> experience it seems that NiFi's sweet spot is routing files based on
>> metadata (properties) and not really based on the actual contents of the
>> files.
>> 
>> 4. Is Nifi suitable for large scale ETL. Copying and doing simple massaging
>> of data from File System A to File System B? From Database A to Database B?
>> This is what I am evaluating it for.
>> 
>> I do see how running this on a box with more CPU and RAM, faster disks
>> (vertical scaling) would improve the performance and then adding another
>> node to the cluster. But I want to first validate the choice of
>> benchmarking flow and understand the performance on one box.
>> 
>> Thanks a lot for all the people for helping me on this thread on my NiFi
>> evaluation journey. This is a really big plus for community support of
>> NiFi.
>> 
>> M
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Jun 1, 2017 at 1:30 PM, Andrew Grande <apere...@gmail.com> wrote:
>> 
>>> It looks like your max bin size is 1000 and 10MB. Every time you hit
>> those,
>>> it will write out a merged file. Update tge filename attribute to be
>> unique
>>> before writing via PutHDFS.
>>> 
>>> Andrew
>>> 
>>> On Thu, Jun 1, 2017, 2:24 AM Martin Eden <martineden...@gmail.com>
>> wrote:
>>> 
>>>> Hi Joe,
>>>> 
>>>> Thanks for the explanations. Really useful in understanding how it
>> works.
>>>> Good to know that in the future this will be improved.
>>>> 
>>>> About the appending to HDFS issue let me recap. My flow is:
>>>> ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) ->
>> SplitText(1)
>>>> -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
>>>> 
>>>> 
>>>>    -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
>>>> 
>>>> 
>>>>    -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
>>>> 
>>>> ListHDFS is monitoring an input folder where 300MB zip files are added
>>>> periodically. Each file uncompressed is 2.5 GB csv.
>>>> 
>>>> So I am writing out to hdfs from multiple PutHDFS processors all of
>> them
>>>> having conflict resolution set to *APPEND* and different output
>> folders.
>>>> 
>>>> The name of the file will be however the same *f.csv*. It gets picked
>> up
>>>> from the name of the flow files which bear the name of the original
>>>> uncompressed file. This happens I think in the MergeContent processor.
>>>> 
>>>> Since all of these processors are running with 1 concurrent task, it
>>> seems
>>>> that we cannot append concurrently to hdfs even if we are appending to
>>>> different files in different folders for some reason. Any ideas how to
>>>> mitigate this?
>>>> 
>>>> It seems other people have encountered this
>>>> <
>>>> https://community.hortonworks.com/questions/61096/puthdfs-
>>> leaseexpiredexception-error-when-running-m.html
>>>>> 
>>>> with NiFi but there is no conclusive solution. It does seem also that
>>>> appending to hdfs is somewhat problematic
>>>> <
>>>> http://community.cloudera.com/t5/Storage-Random-Access-HDFS/
>>> How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369
>>>>> 
>>>> .
>>>> 
>>>> So stepping back, the reason I am doing append in the PutHDFS is
>> because
>>> I
>>>> did not manage to find a setting in the MergeContent processors that
>>>> basically allows creation of multiple bundled flow files with the same
>>> root
>>>> name but different sequence numbers or timestamps (like f.csv.1,
>> f.csv.2
>>>> ....). They all get the same name which is f.csv. Is that possible
>>> somehow?
>>>> See my detailed MergeContent processor config below.
>>>> 
>>>> So basically I have a 2.5GB csv file that eventually gets broken up in
>>>> lines and the lines gets merged together in bundles of 10 MB but when
>>> those
>>>> bundles are emitted to the PutHDFS they have the same name as the
>>> original
>>>> file over and over again. I would like them to have a different name
>>> based
>>>> on a timestamp or sequence number let's say so that I can avoid the
>>> append
>>>> conflict resolution in PutHDFS which is causing me grief right now. Is
>>> that
>>>> possible?
>>>> 
>>>> Thanks,
>>>> M
>>>> 
>>>> 
>>>> Currently my MergeContent processor config is:
>>>>  <properties>
>>>> *   <entry> <key>Merge Strategy</key> <value>Bin-Packing
>>> Algorithm</value>
>>>> </entry>*
>>>> *   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
>>>> </entry>*
>>>>   <entry> <key>Attribute Strategy</key><value>Keep Only Common
>>>> Attributes</value> </entry>
>>>>   <entry> <key>Correlation Attribute Name</key> </entry>
>>>>   <entry> <key>Minimum Number of Entries</key><value>1</value>
>> </entry>
>>>>   <entry> <key>Maximum Number of Entries</key> <value>1000</value>
>>>> </entry>
>>>>   <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
>>>> *   <entry> <key>Maximum Group Size</key> <value>10 MB</value>
>> </entry>*
>>>>   <entry> <key>Max Bin Age</key> </entry>
>>>>   <entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
>>>>   <entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
>>>>   <entry> <key>Header File</key> </entry>
>>>>   <entry> <key>Footer File</key> </entry>
>>>>   <entry> <key>Demarcator File</key> <value></value> </entry>
>>>>   <entry> <key>Compression Level</key> <value>1</value></entry>
>>>>   <entry> <key>Keep Path</key> <value>false</value> </entry>
>>>>  </properties>
>>>> 
>>>> 
>>>> On Wed, May 31, 2017 at 3:52 PM, Joe Witt <joe.w...@gmail.com> wrote:
>>>> 
>>>>> Split failed before even with backpressure:
>>>>> - yes that backpressure kicks in when destination queues for a given
>>>>> processor have reached their target size (in count of flowfiles or
>>>>> total size represented).  However, to clarify why the OOM happened it
>>>>> is important to realize that it is not about 'flow files over a quick
>>>>> period of time' but rather 'flow files held within a single process
>>>>> session.  Your SplitText was pulling a single flowfile but then
>>>>> creating lets say 1,000,000 resulting flow files and then committing
>>>>> that change.  That happens within a session.  But all those flow file
>>>>> objects (not their content) are held in memory and at such high
>>>>> numbers it creates excessive heap usage.  The two phase
>> divide/conquer
>>>>> approach Koji suggested solves that and eventually we need to solve
>>>>> that by swapping out the flowfiles to disk within a session.  We
>>>>> actually do swap out flowfiles sitting on queues after a certain
>>>>> threshold is reached for this very reason.  This means you should be
>>>>> able to have many millions of flowfiles sitting around in the flow
>> for
>>>>> whatever reason and not hit memory problems.
>>>>> 
>>>>> Hope that helps there.
>>>>> 
>>>>> On PutHDFS it looks like possibly two things are trying to append to
>>>>> the same file?  If yes I'd really recommend not appending but rather
>>>>> use MergeContent to create data bundles of a given size then write
>>>>> those to HDFS.
>>>>> 
>>>>> Thanks
>>>>> Joe
>>>>> 
>>>>> On Wed, May 31, 2017 at 10:33 AM, Martin Eden <
>> martineden...@gmail.com
>>>> 
>>>>> wrote:
>>>>>> Hi Koji,
>>>>>> 
>>>>>> Good to know that it can handle large files. I thought it was the
>>> case
>>>>> but
>>>>>> I was just not seeing in practice.
>>>>>> 
>>>>>> Yes I am using 'Line Split Count' as 1 at SplitText.
>>>>>> 
>>>>>> I added the extra SplitText processor exactly as you suggested and
>>> the
>>>>> OOM
>>>>>> went away. So, big thanks!!!
>>>>>> 
>>>>>> However I have 2 follow-up questions:
>>>>>> 
>>>>>> 1. Before adding the extra SplitText processor I also played with
>> the
>>>>>> back-pressure settings on the outbound queue of the original
>>> SplitText
>>>>>> processor, since you mentioned that it is generating files at a
>> rate
>>>> that
>>>>>> is too high, I figure the queue should slow it down. I tried a
>> limit
>>> of
>>>>>> 100MB or 1000 files and I still got the OOMs in the SplitText
>>>> processor.
>>>>>> Why isn't the queue back-pressure helping me in this case? Where
>>> would
>>>>> that
>>>>>> come in handy then? Why id the extra SplitText processor needed to
>>> fix
>>>>>> things and not just the queue back-pressure?
>>>>>> 
>>>>>> 2. I am now close to completing my flow but I am hitting another
>>> error.
>>>>>> This time it's the last stage, the PutHDFS throws
>>>>>> o.apache.nifi.processors.hadoop.PutHDFS
>>>>>> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to
>>>> HDFS
>>>>>> due to org.apache.nifi.processor.exception.ProcessException:
>>>> IOException
>>>>>> thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
>>>>>> See the full stacktrace below.
>>>>>> I have a parallelism of 1 for my PutHDFS processors. Any ideas why
>>> this
>>>>> is
>>>>>> happening?
>>>>>> 
>>>>>> Thanks,
>>>>>> Martin
>>>>>> 
>>>>>> 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
>>>>>> o.apache.nifi.processors.hadoop.PutHDFS
>>>>>> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
>>>>>> 
>>>>>> ailed to write to HDFS due to
>>>>>> org.apache.nifi.processor.exception.ProcessException: IOException
>>>> thrown
>>>>>> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
>>>>>> 
>>>>>> 5aa]:
>>>>>> org.apache.hadoop.ipc.RemoteException(org.apache.
>>> hadoop.hdfs.protocol.
>>>>> AlreadyBeingCreatedException):
>>>>>> Failed to APPEND_FILE /nifi_out/unmatched/log
>>>>>> 
>>>>>> 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on
>> 10.128.0.7
>>>>>> because DFSClient_NONMAPREDUCE_-1411681085_97 is already the
>> current
>>>>> lease
>>>>>> holder.
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> recoverLeaseInternal(FSNamesystem.java:2882)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>> appendFileInternal(
>>>>> FSNamesystem.java:2683)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFileInt(FSNamesystem.java:2982)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFile(FSNamesystem.java:2950)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
>>>>> append(NameNodeRpcServer.java:655)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.append(ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.java:421)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
>>>>> 
>>>> ClientNamenodeProtocol$2.callBlockingMethod(
>>> ClientNamenodeProtocolProtos.j
>>>>>> 
>>>>>> ava)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
>>> ProtoBufRpcInvoker.call(
>>>>> ProtobufRpcEngine.java:616)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2049)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2045)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1698)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$
>>> Handler.run(Server.java:2043)
>>>>>> 
>>>>>> : {}
>>>>>> 
>>>>>> org.apache.nifi.processor.exception.ProcessException: IOException
>>>> thrown
>>>>>> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
>>>>>> org.apache.hadoop.ipc.Re
>>>>>> 
>>>>>> moteException(org.apache.hadoop.hdfs.protocol.
>>>>> AlreadyBeingCreatedException):
>>>>>> Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
>>>>> DFSClient_NON
>>>>>> 
>>>>>> MAPREDUCE_-1411681085_97 on 10.128.0.7 because
>>>>>> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
>>>>> holder.
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> recoverLeaseInternal(FSNamesystem.java:2882)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>> appendFileInternal(
>>>>> FSNamesystem.java:2683)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFileInt(FSNamesystem.java:2982)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFile(FSNamesystem.java:2950)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
>>>>> append(NameNodeRpcServer.java:655)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.append(ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.java:421)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
>>>>> 
>>>> ClientNamenodeProtocol$2.callBlockingMethod(
>>> ClientNamenodeProtocolProtos.j
>>>>>> 
>>>>>> ava)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
>>> ProtoBufRpcInvoker.call(
>>>>> ProtobufRpcEngine.java:616)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2049)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2045)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1698)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$
>>> Handler.run(Server.java:2043)
>>>>>> 
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.read(
>>>>> StandardProcessSession.java:2148)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.read(
>>>>> StandardProcessSession.java:2095)
>>>>>> 
>>>>>>        at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
>>>>> java:293)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:360)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1678)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(
>>> PutHDFS.java:223)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.processor.AbstractProcessor.onTrigger(
>>>>> AbstractProcessor.java:27)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>>>> StandardProcessorNode.java:1118)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>>>>> ContinuallyRunProcessorTask.java:144)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>>>>> ContinuallyRunProcessorTask.java:47)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
>>> run(
>>>>> TimerDrivenSchedulingAgent.java:132)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.
>>> call(Executors.java:511)
>>>>>> 
>>>>>>        at java.util.concurrent.FutureTask.runAndReset(
>>>>> FutureTask.java:308)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1142)
>>>>>> 
>>>>>>        at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:617)
>>>>>> 
>>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>> 
>>>>>> Caused by: org.apache.hadoop.ipc.RemoteException: Failed to
>>> APPEND_FILE
>>>>>> /nifi_out/unmatched/log20160930.csv for
>>>>>> DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
>>>>>> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
>>>>> holder.
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> recoverLeaseInternal(FSNamesystem.java:2882)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>> appendFileInternal(
>>>>> FSNamesystem.java:2683)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFileInt(FSNamesystem.java:2982)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
>>>>> appendFile(FSNamesystem.java:2950)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
>>>>> append(NameNodeRpcServer.java:655)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.append(ClientNamenodeProtocolServerSi
>>>>> deTranslatorPB.java:421)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
>>>>> ClientNamenodeProtocol$2.callBlockingMethod(
>>> ClientNamenodeProtocolProtos.
>>>>> java)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
>>> ProtoBufRpcInvoker.call(
>>>>> ProtobufRpcEngine.java:616)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2049)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
>>> 2045)
>>>>>> 
>>>>>>        at java.security.AccessController.doPrivileged(Native
>>> Method)
>>>>>> 
>>>>>>        at javax.security.auth.Subject.doAs(Subject.java:422)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.security.UserGroupInformation.doAs(
>>>>> UserGroupInformation.java:1698)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Server$
>>> Handler.run(Server.java:2043)
>>>>>> 
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>>>>>> 
>>>>>>        at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
>>>>> invoke(ProtobufRpcEngine.java:229)
>>>>>> 
>>>>>>        at com.sun.proxy.$Proxy188.append(Unknown Source)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
>>>>> orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
>>>>>> 
>>>>>>        at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown
>>>> Source)
>>>>>> 
>>>>>>        at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>> 
>>>>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
>>>>> RetryInvocationHandler.java:191)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
>>>>> RetryInvocationHandler.java:102)
>>>>>> 
>>>>>>        at com.sun.proxy.$Proxy194.append(Unknown Source)
>>>>>> 
>>>>>>        at org.apache.hadoop.hdfs.DFSClient.callAppend(
>>>>> DFSClient.java:1808)
>>>>>> 
>>>>>>        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
>>> java:1877)
>>>>>> 
>>>>>>        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
>>> java:1847)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$4.
>>>>> doCall(DistributedFileSystem.java:340)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem$4.
>>>>> doCall(DistributedFileSystem.java:336)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
>>>>> FileSystemLinkResolver.java:81)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.append(
>>>>> DistributedFileSystem.java:348)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.hadoop.hdfs.DistributedFileSystem.append(
>>>>> DistributedFileSystem.java:318)
>>>>>> 
>>>>>>        at org.apache.hadoop.fs.FileSystem.append(FileSystem.
>>> java:1176)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(
>>> PutHDFS.java:301)
>>>>>> 
>>>>>>        at
>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.read(
>>>>> StandardProcessSession.java:2125)
>>>>>> 
>>>>>>        ... 18 common frames omitted
>>>>>> 
>>>>>> On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <
>>>> ijokaruma...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Martin,
>>>>>>> 
>>>>>>> Generally, NiFi processor doesn't load entire content of file and
>> is
>>>>>>> capable of handling huge files.
>>>>>>> However, having massive amount of FlowFiles can cause OOM issue as
>>>>>>> FlowFiles and its Attributes resides on heap.
>>>>>>> 
>>>>>>> I assume you are using 'Line Split Count' as 1 at SplitText.
>>>>>>> We recommend to use multiple SplitText processors to not generate
>>> many
>>>>>>> FlowFiles in a short period of time.
>>>>>>> For example, 1st SplitText splits files per 5,000 lines, then the
>>> 2nd
>>>>>>> SplitText splits into each line.
>>>>>>> This way, we can decrease number of FlowFiles at a given time
>>>>>>> requiring less heap.
>>>>>>> 
>>>>>>> I hope this helps.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Koji
>>>>>>> 
>>>>>>> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <
>>> martineden...@gmail.com
>>>>> 
>>>>>>> wrote:
>>>>>>>> Hi all,
>>>>>>>> 
>>>>>>>> I have a vanilla Nifi 1.2.0 node with 1GB of heap.
>>>>>>>> 
>>>>>>>> The flow I am trying to run is:
>>>>>>>> ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent ->
>>> MergeContent
>>>>> ->
>>>>>>>> PutHDFS
>>>>>>>> 
>>>>>>>> When I give it a 300MB input zip file (2.5GB uncompressed) I am
>>>>> getting
>>>>>>>> Java OutOfMemoryError as below.
>>>>>>>> 
>>>>>>>> Does NiFi read in the entire contents of files in memory? This
>> is
>>>>>>>> unexpected. I thought it is chunking through files. Giving more
>>> ram
>>>> is
>>>>>>> not
>>>>>>>> a solution as you can always get larger input files in the
>> future.
>>>>>>>> 
>>>>>>>> Does this mean NiFi is not suitable as a scalable ETL solution?
>>>>>>>> 
>>>>>>>> Can someone please explain what is happening and how to mitigate
>>>> large
>>>>>>>> files in NiFi? Any patterns?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> M
>>>>>>>> 
>>>>>>>> ERROR [Timer-Driven Process Thread-9]
>>>>>>>> o.a.nifi.processors.standard.SplitText
>>>>>>>> SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
>>>>>>>> SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to
>>> process
>>>>>>>> session due to java.lang.OutOfMemoryError: Java heap space: {}
>>>>>>>> 
>>>>>>>> java.lang.OutOfMemoryError: Java heap space
>>>>>>>> 
>>>>>>>>        at
>> java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
>>>>>>>> 
>>>>>>>>        at java.util.HashMap.putMapEntries(HashMap.java:511)
>>>>>>>> 
>>>>>>>>        at java.util.HashMap.<init>(HashMap.java:489)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.repository.StandardFlowFileRecord$
>>>>>>> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.repository.StandardFlowFileRecord$
>>>>>>> Builder.addAttributes(StandardFlowFileRecord.java:234)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.repository.StandardProcessSession.
>>>>>>> putAllAttributes(StandardProcessSession.java:1723)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.processors.standard.SplitText.
>>>>>>> updateAttributes(SplitText.java:367)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> 
>>>> org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
>>>>>>> SplitText.java:320)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.processors.standard.SplitText.onTrigger(
>>>>>>> SplitText.java:258)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.processor.AbstractProcessor.onTrigger(
>>>>>>> AbstractProcessor.java:27)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>>>>>>> StandardProcessorNode.java:1118)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(
>>>>>>> ContinuallyRunProcessorTask.java:144)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
>>> call(
>>>>>>> ContinuallyRunProcessorTask.java:47)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> org.apache.nifi.controller.scheduling.
>>> TimerDrivenSchedulingAgent$1.
>>>>> run(
>>>>>>> TimerDrivenSchedulingAgent.java:132)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.
>>>>> call(Executors.java:511)
>>>>>>>> 
>>>>>>>>        at java.util.concurrent.FutureTask.runAndReset(
>>>>>>> FutureTask.java:308)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>>>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
>>> java:180)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>>>>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>>>>> ThreadPoolExecutor.java:1142)
>>>>>>>> 
>>>>>>>>        at
>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(
>>>>>>> ThreadPoolExecutor.java:617)
>>>>>>>> 
>>>>>>>>        at java.lang.Thread.run(Thread.java:748)
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to