Thanks Andrew,

I have added UpdateAttribute processors to update the file names like you
said. Now it works, writing out 1MB files at a time (updated the
MergeContent MaxNumberOfEntries to 10000 to achieve that since each line in
my csv is 100 bytes).

The current flow is:
ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
-> RouteOnContent -> MergeContent -> UpdateAttribute -> PutHDFS


    -> MergeContent -> UpdateAttribute -> PutHDFS


    -> MergeContent -> UpdateAttribute -> PutHDFS

So now let's talk performance.

With a 1 node NiFi running on a Google Compute Engine instance with 1 core
and 3.7 GB RAM and a 20GB disk, when I feed one 300MB zip file
(uncompressed 2.5GB csv text) to this flow it is basically never finishing
the job of transferring all the data.

The inbound queue of RouteOnContent is always red and outbound queues are
mostly green so that indicates that this processor is the bottleneck. To
mitigate this I increased its number of concurrent tasks to 10 and then
observed tasks in progress 10, outbound queues temporarily red, avg task
latency increased from 2ms to 20ms, cpu on box maxed out to 100% by the
NiFi java process, load avg 5.

I then decreased the number of concurrent tasks of RouteOnContent to 5 and
the task average time dropped to about half as expected, with cpu still
100% taken by the NiFi java process.

The RouteOnContent has 3 simple regexes that it applies.

Questions:

1. Is it safe to say that I maxed out the performance of this flow on one
box with 1 core and 3.8 GB ram?

2. The performance seems a lot lower than expected though which is
worrying. Is this expected? I am planning to do this at much larger scale,
hundreds of GBs.

3. Is the RouteOnContent that I am using hitting NiFi hard? Is this not a
recommended use case? Is there anything obviously wrong in my flow?
Doing a bit of digging around in docs, presentations and other people's
experience it seems that NiFi's sweet spot is routing files based on
metadata (properties) and not really based on the actual contents of the
files.

4. Is Nifi suitable for large scale ETL. Copying and doing simple massaging
of data from File System A to File System B? From Database A to Database B?
This is what I am evaluating it for.

I do see how running this on a box with more CPU and RAM, faster disks
(vertical scaling) would improve the performance and then adding another
node to the cluster. But I want to first validate the choice of
benchmarking flow and understand the performance on one box.

Thanks a lot for all the people for helping me on this thread on my NiFi
evaluation journey. This is a really big plus for community support of NiFi.

M






On Thu, Jun 1, 2017 at 1:30 PM, Andrew Grande <apere...@gmail.com> wrote:

> It looks like your max bin size is 1000 and 10MB. Every time you hit those,
> it will write out a merged file. Update tge filename attribute to be unique
> before writing via PutHDFS.
>
> Andrew
>
> On Thu, Jun 1, 2017, 2:24 AM Martin Eden <martineden...@gmail.com> wrote:
>
> > Hi Joe,
> >
> > Thanks for the explanations. Really useful in understanding how it works.
> > Good to know that in the future this will be improved.
> >
> > About the appending to HDFS issue let me recap. My flow is:
> > ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
> > -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
> >
> >
> >     -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
> >
> >
> >     -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
> >
> > ListHDFS is monitoring an input folder where 300MB zip files are added
> > periodically. Each file uncompressed is 2.5 GB csv.
> >
> > So I am writing out to hdfs from multiple PutHDFS processors all of them
> > having conflict resolution set to *APPEND* and different output folders.
> >
> > The name of the file will be however the same *f.csv*. It gets picked up
> > from the name of the flow files which bear the name of the original
> > uncompressed file. This happens I think in the MergeContent processor.
> >
> > Since all of these processors are running with 1 concurrent task, it
> seems
> > that we cannot append concurrently to hdfs even if we are appending to
> > different files in different folders for some reason. Any ideas how to
> > mitigate this?
> >
> > It seems other people have encountered this
> > <
> > https://community.hortonworks.com/questions/61096/puthdfs-
> leaseexpiredexception-error-when-running-m.html
> > >
> > with NiFi but there is no conclusive solution. It does seem also that
> > appending to hdfs is somewhat problematic
> > <
> > http://community.cloudera.com/t5/Storage-Random-Access-HDFS/
> How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369
> > >
> > .
> >
> > So stepping back, the reason I am doing append in the PutHDFS is because
> I
> > did not manage to find a setting in the MergeContent processors that
> > basically allows creation of multiple bundled flow files with the same
> root
> > name but different sequence numbers or timestamps (like f.csv.1, f.csv.2
> > ....). They all get the same name which is f.csv. Is that possible
> somehow?
> > See my detailed MergeContent processor config below.
> >
> > So basically I have a 2.5GB csv file that eventually gets broken up in
> > lines and the lines gets merged together in bundles of 10 MB but when
> those
> > bundles are emitted to the PutHDFS they have the same name as the
> original
> > file over and over again. I would like them to have a different name
> based
> > on a timestamp or sequence number let's say so that I can avoid the
> append
> > conflict resolution in PutHDFS which is causing me grief right now. Is
> that
> > possible?
> >
> > Thanks,
> > M
> >
> >
> > Currently my MergeContent processor config is:
> >   <properties>
> > *   <entry> <key>Merge Strategy</key> <value>Bin-Packing
> Algorithm</value>
> > </entry>*
> > *   <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
> > </entry>*
> >    <entry> <key>Attribute Strategy</key><value>Keep Only Common
> > Attributes</value> </entry>
> >    <entry> <key>Correlation Attribute Name</key> </entry>
> >    <entry> <key>Minimum Number of Entries</key><value>1</value> </entry>
> >    <entry> <key>Maximum Number of Entries</key> <value>1000</value>
> > </entry>
> >    <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
> > *   <entry> <key>Maximum Group Size</key> <value>10 MB</value> </entry>*
> >    <entry> <key>Max Bin Age</key> </entry>
> >    <entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
> >    <entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
> >    <entry> <key>Header File</key> </entry>
> >    <entry> <key>Footer File</key> </entry>
> >    <entry> <key>Demarcator File</key> <value></value> </entry>
> >    <entry> <key>Compression Level</key> <value>1</value></entry>
> >    <entry> <key>Keep Path</key> <value>false</value> </entry>
> >   </properties>
> >
> >
> > On Wed, May 31, 2017 at 3:52 PM, Joe Witt <joe.w...@gmail.com> wrote:
> >
> > > Split failed before even with backpressure:
> > > - yes that backpressure kicks in when destination queues for a given
> > > processor have reached their target size (in count of flowfiles or
> > > total size represented).  However, to clarify why the OOM happened it
> > > is important to realize that it is not about 'flow files over a quick
> > > period of time' but rather 'flow files held within a single process
> > > session.  Your SplitText was pulling a single flowfile but then
> > > creating lets say 1,000,000 resulting flow files and then committing
> > > that change.  That happens within a session.  But all those flow file
> > > objects (not their content) are held in memory and at such high
> > > numbers it creates excessive heap usage.  The two phase divide/conquer
> > > approach Koji suggested solves that and eventually we need to solve
> > > that by swapping out the flowfiles to disk within a session.  We
> > > actually do swap out flowfiles sitting on queues after a certain
> > > threshold is reached for this very reason.  This means you should be
> > > able to have many millions of flowfiles sitting around in the flow for
> > > whatever reason and not hit memory problems.
> > >
> > > Hope that helps there.
> > >
> > > On PutHDFS it looks like possibly two things are trying to append to
> > > the same file?  If yes I'd really recommend not appending but rather
> > > use MergeContent to create data bundles of a given size then write
> > > those to HDFS.
> > >
> > > Thanks
> > > Joe
> > >
> > > On Wed, May 31, 2017 at 10:33 AM, Martin Eden <martineden...@gmail.com
> >
> > > wrote:
> > > > Hi Koji,
> > > >
> > > > Good to know that it can handle large files. I thought it was the
> case
> > > but
> > > > I was just not seeing in practice.
> > > >
> > > > Yes I am using 'Line Split Count' as 1 at SplitText.
> > > >
> > > > I added the extra SplitText processor exactly as you suggested and
> the
> > > OOM
> > > > went away. So, big thanks!!!
> > > >
> > > > However I have 2 follow-up questions:
> > > >
> > > > 1. Before adding the extra SplitText processor I also played with the
> > > > back-pressure settings on the outbound queue of the original
> SplitText
> > > > processor, since you mentioned that it is generating files at a rate
> > that
> > > > is too high, I figure the queue should slow it down. I tried a limit
> of
> > > > 100MB or 1000 files and I still got the OOMs in the SplitText
> > processor.
> > > > Why isn't the queue back-pressure helping me in this case? Where
> would
> > > that
> > > > come in handy then? Why id the extra SplitText processor needed to
> fix
> > > > things and not just the queue back-pressure?
> > > >
> > > > 2. I am now close to completing my flow but I am hitting another
> error.
> > > > This time it's the last stage, the PutHDFS throws
> > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to
> > HDFS
> > > > due to org.apache.nifi.processor.exception.ProcessException:
> > IOException
> > > > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > > See the full stacktrace below.
> > > > I have a parallelism of 1 for my PutHDFS processors. Any ideas why
> this
> > > is
> > > > happening?
> > > >
> > > > Thanks,
> > > > Martin
> > > >
> > > > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> > > > o.apache.nifi.processors.hadoop.PutHDFS
> > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
> > > >
> > > > ailed to write to HDFS due to
> > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > thrown
> > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > > >
> > > > 5aa]:
> > > > org.apache.hadoop.ipc.RemoteException(org.apache.
> hadoop.hdfs.protocol.
> > > AlreadyBeingCreatedException):
> > > > Failed to APPEND_FILE /nifi_out/unmatched/log
> > > >
> > > > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
> > > > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current
> > > lease
> > > > holder.
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInternal(
> > > FSNamesystem.java:2683)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFileInt(FSNamesystem.java:2982)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFile(FSNamesystem.java:2950)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > append(NameNodeRpcServer.java:655)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > deTranslatorPB.java:421)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > >
> > ClientNamenodeProtocol$2.callBlockingMethod(
> ClientNamenodeProtocolProtos.j
> > > >
> > > > ava)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> ProtoBufRpcInvoker.call(
> > > ProtobufRpcEngine.java:616)
> > > >
> > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2049)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2045)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1698)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$
> Handler.run(Server.java:2043)
> > > >
> > > > : {}
> > > >
> > > > org.apache.nifi.processor.exception.ProcessException: IOException
> > thrown
> > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> > > > org.apache.hadoop.ipc.Re
> > > >
> > > > moteException(org.apache.hadoop.hdfs.protocol.
> > > AlreadyBeingCreatedException):
> > > > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
> > > DFSClient_NON
> > > >
> > > > MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > holder.
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInternal(
> > > FSNamesystem.java:2683)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFileInt(FSNamesystem.java:2982)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFile(FSNamesystem.java:2950)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > append(NameNodeRpcServer.java:655)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > deTranslatorPB.java:421)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > >
> > ClientNamenodeProtocol$2.callBlockingMethod(
> ClientNamenodeProtocolProtos.j
> > > >
> > > > ava)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> ProtoBufRpcInvoker.call(
> > > ProtobufRpcEngine.java:616)
> > > >
> > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2049)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2045)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1698)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$
> Handler.run(Server.java:2043)
> > > >
> > > >
> > > >         at
> > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > StandardProcessSession.java:2148)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > StandardProcessSession.java:2095)
> > > >
> > > >         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
> > > java:293)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:360)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1678)
> > > >
> > > >         at
> > > > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(
> PutHDFS.java:223)
> > > >
> > > >         at
> > > > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > AbstractProcessor.java:27)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > StandardProcessorNode.java:1118)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > ContinuallyRunProcessorTask.java:144)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> > > ContinuallyRunProcessorTask.java:47)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> run(
> > > TimerDrivenSchedulingAgent.java:132)
> > > >
> > > >         at
> > > > java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > > >
> > > >         at java.util.concurrent.FutureTask.runAndReset(
> > > FutureTask.java:308)
> > > >
> > > >         at
> > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> > > >
> > > >         at
> > > > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > >
> > > >         at
> > > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > > >
> > > >         at
> > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > > >
> > > >         at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to
> APPEND_FILE
> > > > /nifi_out/unmatched/log20160930.csv for
> > > > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> > > holder.
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > recoverLeaseInternal(FSNamesystem.java:2882)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInternal(
> > > FSNamesystem.java:2683)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFileInt(FSNamesystem.java:2982)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> > > appendFile(FSNamesystem.java:2950)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> > > append(NameNodeRpcServer.java:655)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> > > deTranslatorPB.append(ClientNamenodeProtocolServerSi
> > > deTranslatorPB.java:421)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> > > ClientNamenodeProtocol$2.callBlockingMethod(
> ClientNamenodeProtocolProtos.
> > > java)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$
> ProtoBufRpcInvoker.call(
> > > ProtobufRpcEngine.java:616)
> > > >
> > > >         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2049)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:
> 2045)
> > > >
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >
> > > >         at
> > > > org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1698)
> > > >
> > > >         at org.apache.hadoop.ipc.Server$
> Handler.run(Server.java:2043)
> > > >
> > > >
> > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> > > >
> > > >         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> > > >
> > > >         at
> > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> > > invoke(ProtobufRpcEngine.java:229)
> > > >
> > > >         at com.sun.proxy.$Proxy188.append(Unknown Source)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> > > orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
> > > >
> > > >         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown
> > Source)
> > > >
> > > >         at
> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > DelegatingMethodAccessorImpl.java:43)
> > > >
> > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > >
> > > >         at
> > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> > > RetryInvocationHandler.java:191)
> > > >
> > > >         at
> > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> > > RetryInvocationHandler.java:102)
> > > >
> > > >         at com.sun.proxy.$Proxy194.append(Unknown Source)
> > > >
> > > >         at org.apache.hadoop.hdfs.DFSClient.callAppend(
> > > DFSClient.java:1808)
> > > >
> > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> java:1877)
> > > >
> > > >         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.
> java:1847)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > doCall(DistributedFileSystem.java:340)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> > > doCall(DistributedFileSystem.java:336)
> > > >
> > > >         at
> > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> > > FileSystemLinkResolver.java:81)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > DistributedFileSystem.java:348)
> > > >
> > > >         at
> > > > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> > > DistributedFileSystem.java:318)
> > > >
> > > >         at org.apache.hadoop.fs.FileSystem.append(FileSystem.
> java:1176)
> > > >
> > > >         at
> > > > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(
> PutHDFS.java:301)
> > > >
> > > >         at
> > > > org.apache.nifi.controller.repository.StandardProcessSession.read(
> > > StandardProcessSession.java:2125)
> > > >
> > > >         ... 18 common frames omitted
> > > >
> > > > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <
> > ijokaruma...@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Martin,
> > > >>
> > > >> Generally, NiFi processor doesn't load entire content of file and is
> > > >> capable of handling huge files.
> > > >> However, having massive amount of FlowFiles can cause OOM issue as
> > > >> FlowFiles and its Attributes resides on heap.
> > > >>
> > > >> I assume you are using 'Line Split Count' as 1 at SplitText.
> > > >> We recommend to use multiple SplitText processors to not generate
> many
> > > >> FlowFiles in a short period of time.
> > > >> For example, 1st SplitText splits files per 5,000 lines, then the
> 2nd
> > > >> SplitText splits into each line.
> > > >> This way, we can decrease number of FlowFiles at a given time
> > > >> requiring less heap.
> > > >>
> > > >> I hope this helps.
> > > >>
> > > >> Thanks,
> > > >> Koji
> > > >>
> > > >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <
> martineden...@gmail.com
> > >
> > > >> wrote:
> > > >> > Hi all,
> > > >> >
> > > >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> > > >> >
> > > >> > The flow I am trying to run is:
> > > >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent ->
> MergeContent
> > > ->
> > > >> > PutHDFS
> > > >> >
> > > >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am
> > > getting
> > > >> > Java OutOfMemoryError as below.
> > > >> >
> > > >> > Does NiFi read in the entire contents of files in memory? This is
> > > >> > unexpected. I thought it is chunking through files. Giving more
> ram
> > is
> > > >> not
> > > >> > a solution as you can always get larger input files in the future.
> > > >> >
> > > >> > Does this mean NiFi is not suitable as a scalable ETL solution?
> > > >> >
> > > >> > Can someone please explain what is happening and how to mitigate
> > large
> > > >> > files in NiFi? Any patterns?
> > > >> >
> > > >> > Thanks,
> > > >> > M
> > > >> >
> > > >> > ERROR [Timer-Driven Process Thread-9]
> > > >> > o.a.nifi.processors.standard.SplitText
> > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to
> process
> > > >> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> > > >> >
> > > >> > java.lang.OutOfMemoryError: Java heap space
> > > >> >
> > > >> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> > > >> >
> > > >> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> > > >> >
> > > >> >         at java.util.HashMap.<init>(HashMap.java:489)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > >> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> > > >> Builder.addAttributes(StandardFlowFileRecord.java:234)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.repository.StandardProcessSession.
> > > >> putAllAttributes(StandardProcessSession.java:1723)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.processors.standard.SplitText.
> > > >> updateAttributes(SplitText.java:367)
> > > >> >
> > > >> >         at
> > > >> >
> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> > > >> SplitText.java:320)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> > > >> SplitText.java:258)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > > >> AbstractProcessor.java:27)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > > >> StandardProcessorNode.java:1118)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > > >> ContinuallyRunProcessorTask.java:144)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > > >> ContinuallyRunProcessorTask.java:47)
> > > >> >
> > > >> >         at
> > > >> > org.apache.nifi.controller.scheduling.
> TimerDrivenSchedulingAgent$1.
> > > run(
> > > >> TimerDrivenSchedulingAgent.java:132)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.Executors$RunnableAdapter.
> > > call(Executors.java:511)
> > > >> >
> > > >> >         at java.util.concurrent.FutureTask.runAndReset(
> > > >> FutureTask.java:308)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.
> java:180)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> > > >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > >> ThreadPoolExecutor.java:1142)
> > > >> >
> > > >> >         at
> > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > >> ThreadPoolExecutor.java:617)
> > > >> >
> > > >> >         at java.lang.Thread.run(Thread.java:748)
> > > >>
> > >
> >
>

Reply via email to