Alex,

The throughput is very low. Every bucket will have a WAL file open. It
could be an issue with HDFS. How large is the cluster? Have you tried to
run it with a small set of partitions?

Thanks,
Thomas


On Sat, Apr 2, 2016 at 4:22 PM, McCullough, Alex <
[email protected]> wrote:

> Hello All,
>
> I work at Capital One and have been working on an application with many of
> the other Capital One folks active within Apex community.
>
> We have run in to an issue with HDHT that I was hoping to get some help
> with. We are processing customer account data from a flat file and storing
> lists of historical field aggregates in HDHT. When the DAG first starts if
> there are any partitions of the HDHT operators running we get errors
> straight out of the gates and it essentially grinds the whole thing to stop
> and it has real trouble recovering. If I knock it down to a single
> partition the errors do not occur.
>
> Could this be an issue with simultaneously trying to create so many
> directories and WALs? We have set a max of 1000 buckets for HDHT, each
> account has ~7 slices created/updated per tuple processed representing
> lists of historical field values, and the DAG is processing ~250
> tuples/sec. Let me know if there is any other info that might be helpful, I
> have also included the two exceptions that get thrown.
>
> Thanks,
> Alex
>
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727):
> File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21,
> pendingcreates: 30]
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
> at org.apache.hadoop.ipc.Client.call(Client.java:1472)
> at org.apache.hadoop.ipc.Client.call(Client.java:1403)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
> at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668)
> 2016-04-02 19:03:15,496 WARN org.apache.hadoop.hdfs.DFSClient: Error while
> syncing
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727):
> File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21,
> pendingcreates: 30]
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
> at org.apache.hadoop.ipc.Client.call(Client.java:1472)
> at org.apache.hadoop.ipc.Client.call(Client.java:1403)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
> at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668)
>
>
> SECOND ERROR
>
> java.lang.RuntimeException: Failed to flush WAL
> at com.datatorrent.contrib.hdht.HDHTWriter.endWindow(k:555)
> at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:145)
> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:86)
> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:54)
> at com.datatorrent.stram.stream.OiOStream.put(OiOStream.java:67)
> at com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:120)
> at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:154)
> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:86)
> at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:54)
> at com.datatorrent.stram.stream.OiOStream.put(OiOStream.java:67)
> at com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:120)
> at
> com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:154)
> at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:351)
> at
> com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
> Caused by:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
> No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727):
> File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21,
> pendingcreates: 30]
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399)
> at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255)
> at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676)
> at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483)
> at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080)
> at org.apache.hadoop.ipc.Client.call(Client.java:1472)
> at org.apache.hadoop.ipc.Client.call(Client.java:1403)
> at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)
> at com.sun.proxy.$Proxy14.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399)
> at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
> at com.sun.proxy.$Proxy15.addBlock(Unknown Source)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471)
> at
> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668)
>
> ________________________________________________________
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>

Reply via email to