Hi Alex, HDHT flushes WAL on every endWindow. If you have lots of HDHT buckets in the DAG then number of WAL files flushes will be high. I will suggest to increase APPLICATION_WINDOW_COUNT attribute on the HDHT operator to increase the duration between WAL flushes.
Regards, -Tushar. On Sun, Apr 3, 2016 at 4:52 AM, McCullough, Alex < [email protected]> wrote: > Hello All, > > I work at Capital One and have been working on an application with many of > the other Capital One folks active within Apex community. > > We have run in to an issue with HDHT that I was hoping to get some help > with. We are processing customer account data from a flat file and storing > lists of historical field aggregates in HDHT. When the DAG first starts if > there are any partitions of the HDHT operators running we get errors > straight out of the gates and it essentially grinds the whole thing to stop > and it has real trouble recovering. If I knock it down to a single > partition the errors do not occur. > > Could this be an issue with simultaneously trying to create so many > directories and WALs? We have set a max of 1000 buckets for HDHT, each > account has ~7 slices created/updated per tuple processed representing > lists of historical field values, and the DAG is processing ~250 > tuples/sec. Let me know if there is any other info that might be helpful, I > have also included the two exceptions that get thrown. > > Thanks, > Alex > > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727): > File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21, > pendingcreates: 30] > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) > at org.apache.hadoop.ipc.Client.call(Client.java:1472) > at org.apache.hadoop.ipc.Client.call(Client.java:1403) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) > at com.sun.proxy.$Proxy14.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399) > at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) > at com.sun.proxy.$Proxy15.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668) > 2016-04-02 19:03:15,496 WARN org.apache.hadoop.hdfs.DFSClient: Error while > syncing > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727): > File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21, > pendingcreates: 30] > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) > at org.apache.hadoop.ipc.Client.call(Client.java:1472) > at org.apache.hadoop.ipc.Client.call(Client.java:1403) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) > at com.sun.proxy.$Proxy14.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399) > at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) > at com.sun.proxy.$Proxy15.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668) > > > SECOND ERROR > > java.lang.RuntimeException: Failed to flush WAL > at com.datatorrent.contrib.hdht.HDHTWriter.endWindow(k:555) > at > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:145) > at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:86) > at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:54) > at com.datatorrent.stram.stream.OiOStream.put(OiOStream.java:67) > at com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:120) > at > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:154) > at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:86) > at com.datatorrent.stram.engine.OiONode$ControlSink.put(OiONode.java:54) > at com.datatorrent.stram.stream.OiOStream.put(OiOStream.java:67) > at com.datatorrent.stram.stream.MuxStream.put(MuxStream.java:120) > at > com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:154) > at com.datatorrent.stram.engine.GenericNode.run(GenericNode.java:351) > at > com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388) > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease on /user/vault8/citadel_data/hdht/702/_WAL-0 (inode 77399727): > File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_586890524_21, > pendingcreates: 30] > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3602) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3399) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3255) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:676) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.addBlock(AuthorizationProviderProxyClientProtocol.java:212) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:483) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2086) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2082) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:415) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2080) > at org.apache.hadoop.ipc.Client.call(Client.java:1472) > at org.apache.hadoop.ipc.Client.call(Client.java:1403) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) > at com.sun.proxy.$Proxy14.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:399) > at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:252) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104) > at com.sun.proxy.$Proxy15.addBlock(Unknown Source) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1674) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1471) > at > org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:668) > > ________________________________________________________ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
