It looks like your max bin size is 1000 and 10MB. Every time you hit those, it will write out a merged file. Update tge filename attribute to be unique before writing via PutHDFS.
Andrew On Thu, Jun 1, 2017, 2:24 AM Martin Eden <martineden...@gmail.com> wrote: > Hi Joe, > > Thanks for the explanations. Really useful in understanding how it works. > Good to know that in the future this will be improved. > > About the appending to HDFS issue let me recap. My flow is: > ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1) > -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv > > > -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv > > > -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv > > ListHDFS is monitoring an input folder where 300MB zip files are added > periodically. Each file uncompressed is 2.5 GB csv. > > So I am writing out to hdfs from multiple PutHDFS processors all of them > having conflict resolution set to *APPEND* and different output folders. > > The name of the file will be however the same *f.csv*. It gets picked up > from the name of the flow files which bear the name of the original > uncompressed file. This happens I think in the MergeContent processor. > > Since all of these processors are running with 1 concurrent task, it seems > that we cannot append concurrently to hdfs even if we are appending to > different files in different folders for some reason. Any ideas how to > mitigate this? > > It seems other people have encountered this > < > https://community.hortonworks.com/questions/61096/puthdfs-leaseexpiredexception-error-when-running-m.html > > > with NiFi but there is no conclusive solution. It does seem also that > appending to hdfs is somewhat problematic > < > http://community.cloudera.com/t5/Storage-Random-Access-HDFS/How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369 > > > . > > So stepping back, the reason I am doing append in the PutHDFS is because I > did not manage to find a setting in the MergeContent processors that > basically allows creation of multiple bundled flow files with the same root > name but different sequence numbers or timestamps (like f.csv.1, f.csv.2 > ....). They all get the same name which is f.csv. Is that possible somehow? > See my detailed MergeContent processor config below. > > So basically I have a 2.5GB csv file that eventually gets broken up in > lines and the lines gets merged together in bundles of 10 MB but when those > bundles are emitted to the PutHDFS they have the same name as the original > file over and over again. I would like them to have a different name based > on a timestamp or sequence number let's say so that I can avoid the append > conflict resolution in PutHDFS which is causing me grief right now. Is that > possible? > > Thanks, > M > > > Currently my MergeContent processor config is: > <properties> > * <entry> <key>Merge Strategy</key> <value>Bin-Packing Algorithm</value> > </entry>* > * <entry> <key>Merge Format</key> <value>Binary Concatenation</value> > </entry>* > <entry> <key>Attribute Strategy</key><value>Keep Only Common > Attributes</value> </entry> > <entry> <key>Correlation Attribute Name</key> </entry> > <entry> <key>Minimum Number of Entries</key><value>1</value> </entry> > <entry> <key>Maximum Number of Entries</key> <value>1000</value> > </entry> > <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry> > * <entry> <key>Maximum Group Size</key> <value>10 MB</value> </entry>* > <entry> <key>Max Bin Age</key> </entry> > <entry> <key>Maximum number of Bins</key> <value>5</value> </entry> > <entry> <key>Delimiter Strategy</key><value>Text</value> </entry> > <entry> <key>Header File</key> </entry> > <entry> <key>Footer File</key> </entry> > <entry> <key>Demarcator File</key> <value></value> </entry> > <entry> <key>Compression Level</key> <value>1</value></entry> > <entry> <key>Keep Path</key> <value>false</value> </entry> > </properties> > > > On Wed, May 31, 2017 at 3:52 PM, Joe Witt <joe.w...@gmail.com> wrote: > > > Split failed before even with backpressure: > > - yes that backpressure kicks in when destination queues for a given > > processor have reached their target size (in count of flowfiles or > > total size represented). However, to clarify why the OOM happened it > > is important to realize that it is not about 'flow files over a quick > > period of time' but rather 'flow files held within a single process > > session. Your SplitText was pulling a single flowfile but then > > creating lets say 1,000,000 resulting flow files and then committing > > that change. That happens within a session. But all those flow file > > objects (not their content) are held in memory and at such high > > numbers it creates excessive heap usage. The two phase divide/conquer > > approach Koji suggested solves that and eventually we need to solve > > that by swapping out the flowfiles to disk within a session. We > > actually do swap out flowfiles sitting on queues after a certain > > threshold is reached for this very reason. This means you should be > > able to have many millions of flowfiles sitting around in the flow for > > whatever reason and not hit memory problems. > > > > Hope that helps there. > > > > On PutHDFS it looks like possibly two things are trying to append to > > the same file? If yes I'd really recommend not appending but rather > > use MergeContent to create data bundles of a given size then write > > those to HDFS. > > > > Thanks > > Joe > > > > On Wed, May 31, 2017 at 10:33 AM, Martin Eden <martineden...@gmail.com> > > wrote: > > > Hi Koji, > > > > > > Good to know that it can handle large files. I thought it was the case > > but > > > I was just not seeing in practice. > > > > > > Yes I am using 'Line Split Count' as 1 at SplitText. > > > > > > I added the extra SplitText processor exactly as you suggested and the > > OOM > > > went away. So, big thanks!!! > > > > > > However I have 2 follow-up questions: > > > > > > 1. Before adding the extra SplitText processor I also played with the > > > back-pressure settings on the outbound queue of the original SplitText > > > processor, since you mentioned that it is generating files at a rate > that > > > is too high, I figure the queue should slow it down. I tried a limit of > > > 100MB or 1000 files and I still got the OOMs in the SplitText > processor. > > > Why isn't the queue back-pressure helping me in this case? Where would > > that > > > come in handy then? Why id the extra SplitText processor needed to fix > > > things and not just the queue back-pressure? > > > > > > 2. I am now close to completing my flow but I am hitting another error. > > > This time it's the last stage, the PutHDFS throws > > > o.apache.nifi.processors.hadoop.PutHDFS > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to > HDFS > > > due to org.apache.nifi.processor.exception.ProcessException: > IOException > > > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec > > > See the full stacktrace below. > > > I have a parallelism of 1 for my PutHDFS processors. Any ideas why this > > is > > > happening? > > > > > > Thanks, > > > Martin > > > > > > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5] > > > o.apache.nifi.processors.hadoop.PutHDFS > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F > > > > > > ailed to write to HDFS due to > > > org.apache.nifi.processor.exception.ProcessException: IOException > thrown > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec > > > > > > 5aa]: > > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol. > > AlreadyBeingCreatedException): > > > Failed to APPEND_FILE /nifi_out/unmatched/log > > > > > > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 > > > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current > > lease > > > holder. > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > recoverLeaseInternal(FSNamesystem.java:2882) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal( > > FSNamesystem.java:2683) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFileInt(FSNamesystem.java:2982) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFile(FSNamesystem.java:2950) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > > append(NameNodeRpcServer.java:655) > > > > > > at > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > > deTranslatorPB.append(ClientNamenodeProtocolServerSi > > deTranslatorPB.java:421) > > > > > > at > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > > > ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j > > > > > > ava) > > > > > > at > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call( > > ProtobufRpcEngine.java:616) > > > > > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > at > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > UserGroupInformation.java:1698) > > > > > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > > > > > : {} > > > > > > org.apache.nifi.processor.exception.ProcessException: IOException > thrown > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]: > > > org.apache.hadoop.ipc.Re > > > > > > moteException(org.apache.hadoop.hdfs.protocol. > > AlreadyBeingCreatedException): > > > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for > > DFSClient_NON > > > > > > MAPREDUCE_-1411681085_97 on 10.128.0.7 because > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease > > holder. > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > recoverLeaseInternal(FSNamesystem.java:2882) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal( > > FSNamesystem.java:2683) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFileInt(FSNamesystem.java:2982) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFile(FSNamesystem.java:2950) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > > append(NameNodeRpcServer.java:655) > > > > > > at > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > > deTranslatorPB.append(ClientNamenodeProtocolServerSi > > deTranslatorPB.java:421) > > > > > > at > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > > > ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j > > > > > > ava) > > > > > > at > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call( > > ProtobufRpcEngine.java:616) > > > > > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > at > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > UserGroupInformation.java:1698) > > > > > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > > > > > > > > at > > > org.apache.nifi.controller.repository.StandardProcessSession.read( > > StandardProcessSession.java:2148) > > > > > > at > > > org.apache.nifi.controller.repository.StandardProcessSession.read( > > StandardProcessSession.java:2095) > > > > > > at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS. > > java:293) > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:360) > > > > > > at > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > UserGroupInformation.java:1678) > > > > > > at > > > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223) > > > > > > at > > > org.apache.nifi.processor.AbstractProcessor.onTrigger( > > AbstractProcessor.java:27) > > > > > > at > > > org.apache.nifi.controller.StandardProcessorNode.onTrigger( > > StandardProcessorNode.java:1118) > > > > > > at > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > > ContinuallyRunProcessorTask.java:144) > > > > > > at > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > > ContinuallyRunProcessorTask.java:47) > > > > > > at > > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run( > > TimerDrivenSchedulingAgent.java:132) > > > > > > at > > > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > > > > > at java.util.concurrent.FutureTask.runAndReset( > > FutureTask.java:308) > > > > > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > > > > > at > > > java.util.concurrent.ScheduledThreadPoolExecutor$ > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > > > > > at > > > java.util.concurrent.ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1142) > > > > > > at > > > java.util.concurrent.ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:617) > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE > > > /nifi_out/unmatched/log20160930.csv for > > > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease > > holder. > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > recoverLeaseInternal(FSNamesystem.java:2882) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal( > > FSNamesystem.java:2683) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFileInt(FSNamesystem.java:2982) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFile(FSNamesystem.java:2950) > > > > > > at > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > > append(NameNodeRpcServer.java:655) > > > > > > at > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > > deTranslatorPB.append(ClientNamenodeProtocolServerSi > > deTranslatorPB.java:421) > > > > > > at > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > > ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos. > > java) > > > > > > at > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call( > > ProtobufRpcEngine.java:616) > > > > > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > > > > > > at java.security.AccessController.doPrivileged(Native Method) > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > at > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > UserGroupInformation.java:1698) > > > > > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > > > > > > > > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > > > > > > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > > > > > > at > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker. > > invoke(ProtobufRpcEngine.java:229) > > > > > > at com.sun.proxy.$Proxy188.append(Unknown Source) > > > > > > at > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat > > orPB.append(ClientNamenodeProtocolTranslatorPB.java:328) > > > > > > at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown > Source) > > > > > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke( > > DelegatingMethodAccessorImpl.java:43) > > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > > at > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( > > RetryInvocationHandler.java:191) > > > > > > at > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( > > RetryInvocationHandler.java:102) > > > > > > at com.sun.proxy.$Proxy194.append(Unknown Source) > > > > > > at org.apache.hadoop.hdfs.DFSClient.callAppend( > > DFSClient.java:1808) > > > > > > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) > > > > > > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) > > > > > > at > > > org.apache.hadoop.hdfs.DistributedFileSystem$4. > > doCall(DistributedFileSystem.java:340) > > > > > > at > > > org.apache.hadoop.hdfs.DistributedFileSystem$4. > > doCall(DistributedFileSystem.java:336) > > > > > > at > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve( > > FileSystemLinkResolver.java:81) > > > > > > at > > > org.apache.hadoop.hdfs.DistributedFileSystem.append( > > DistributedFileSystem.java:348) > > > > > > at > > > org.apache.hadoop.hdfs.DistributedFileSystem.append( > > DistributedFileSystem.java:318) > > > > > > at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176) > > > > > > at > > > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301) > > > > > > at > > > org.apache.nifi.controller.repository.StandardProcessSession.read( > > StandardProcessSession.java:2125) > > > > > > ... 18 common frames omitted > > > > > > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura < > ijokaruma...@gmail.com> > > > wrote: > > > > > >> Hi Martin, > > >> > > >> Generally, NiFi processor doesn't load entire content of file and is > > >> capable of handling huge files. > > >> However, having massive amount of FlowFiles can cause OOM issue as > > >> FlowFiles and its Attributes resides on heap. > > >> > > >> I assume you are using 'Line Split Count' as 1 at SplitText. > > >> We recommend to use multiple SplitText processors to not generate many > > >> FlowFiles in a short period of time. > > >> For example, 1st SplitText splits files per 5,000 lines, then the 2nd > > >> SplitText splits into each line. > > >> This way, we can decrease number of FlowFiles at a given time > > >> requiring less heap. > > >> > > >> I hope this helps. > > >> > > >> Thanks, > > >> Koji > > >> > > >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <martineden...@gmail.com > > > > >> wrote: > > >> > Hi all, > > >> > > > >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap. > > >> > > > >> > The flow I am trying to run is: > > >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent > > -> > > >> > PutHDFS > > >> > > > >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am > > getting > > >> > Java OutOfMemoryError as below. > > >> > > > >> > Does NiFi read in the entire contents of files in memory? This is > > >> > unexpected. I thought it is chunking through files. Giving more ram > is > > >> not > > >> > a solution as you can always get larger input files in the future. > > >> > > > >> > Does this mean NiFi is not suitable as a scalable ETL solution? > > >> > > > >> > Can someone please explain what is happening and how to mitigate > large > > >> > files in NiFi? Any patterns? > > >> > > > >> > Thanks, > > >> > M > > >> > > > >> > ERROR [Timer-Driven Process Thread-9] > > >> > o.a.nifi.processors.standard.SplitText > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process > > >> > session due to java.lang.OutOfMemoryError: Java heap space: {} > > >> > > > >> > java.lang.OutOfMemoryError: Java heap space > > >> > > > >> > at java.util.HashMap$EntrySet.iterator(HashMap.java:1013) > > >> > > > >> > at java.util.HashMap.putMapEntries(HashMap.java:511) > > >> > > > >> > at java.util.HashMap.<init>(HashMap.java:489) > > >> > > > >> > at > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$ > > >> Builder.initializeAttributes(StandardFlowFileRecord.java:219) > > >> > > > >> > at > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$ > > >> Builder.addAttributes(StandardFlowFileRecord.java:234) > > >> > > > >> > at > > >> > org.apache.nifi.controller.repository.StandardProcessSession. > > >> putAllAttributes(StandardProcessSession.java:1723) > > >> > > > >> > at > > >> > org.apache.nifi.processors.standard.SplitText. > > >> updateAttributes(SplitText.java:367) > > >> > > > >> > at > > >> > > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles( > > >> SplitText.java:320) > > >> > > > >> > at > > >> > org.apache.nifi.processors.standard.SplitText.onTrigger( > > >> SplitText.java:258) > > >> > > > >> > at > > >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( > > >> AbstractProcessor.java:27) > > >> > > > >> > at > > >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger( > > >> StandardProcessorNode.java:1118) > > >> > > > >> > at > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > > >> ContinuallyRunProcessorTask.java:144) > > >> > > > >> > at > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > > >> ContinuallyRunProcessorTask.java:47) > > >> > > > >> > at > > >> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1. > > run( > > >> TimerDrivenSchedulingAgent.java:132) > > >> > > > >> > at > > >> > java.util.concurrent.Executors$RunnableAdapter. > > call(Executors.java:511) > > >> > > > >> > at java.util.concurrent.FutureTask.runAndReset( > > >> FutureTask.java:308) > > >> > > > >> > at > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > > >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > >> > > > >> > at > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > > >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > >> > > > >> > at > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker( > > >> ThreadPoolExecutor.java:1142) > > >> > > > >> > at > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( > > >> ThreadPoolExecutor.java:617) > > >> > > > >> > at java.lang.Thread.run(Thread.java:748) > > >> > > >