Thanks, did that and now I am getting an out of memory. But I am not sure where this occurs. It can't be on the spark executor as I have 28GB allocated to it. It is not the driver because I run this locally and monitor it via jvisualvm. Unfortunately I can't jmx-monitor hadoop.

From the stacktrace it seems it fails remotelly, after

    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)

Maybe at the namenode. Will try to increase it's memory.




java.io.IOException: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:561)
    at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy10.getListing(Unknown Source)
*_    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)_*
    at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755) at org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
    at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:292) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264) at org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:47) at org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:43)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
    at org.apache.spark.rdd.RDD.groupBy(RDD.scala:555)
at com.stratified.crossref.CitationExtractorJob$.extractCitations(CitationExtractorJob.scala:78) at com.stratified.crossref.CitationExtractorJob$.execute(CitationExtractorJob.scala:32) at com.stratified.crossref.CitationExtractorJob$.main(CitationExtractorJob.scala:20) at com.stratified.crossref.CitationExtractorJob.main(CitationExtractorJob.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480) Caused by: com.google.protobuf.ServiceException: java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:274)
    at com.sun.proxy.$Proxy9.getListing(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:554)
    ... 41 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at com.google.protobuf.ByteString.copyFrom(ByteString.java:192)
at com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.<init>(HdfsProtos.java:21261) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.<init>(HdfsProtos.java:21172) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto$1.parsePartialFrom(HdfsProtos.java:21360) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto$1.parsePartialFrom(HdfsProtos.java:21355) at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto.<init>(HdfsProtos.java:24929) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto.<init>(HdfsProtos.java:24876) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto$1.parsePartialFrom(HdfsProtos.java:24970) at org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto$1.parsePartialFrom(HdfsProtos.java:24965) at com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto.<init>(ClientNamenodeProtocolProtos.java:26824) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto.<init>(ClientNamenodeProtocolProtos.java:26771) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$1.parsePartialFrom(ClientNamenodeProtocolProtos.java:26862) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$1.parsePartialFrom(ClientNamenodeProtocolProtos.java:26857) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$Builder.mergeFrom(ClientNamenodeProtocolProtos.java:27167) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$Builder.mergeFrom(ClientNamenodeProtocolProtos.java:27050) at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:337) at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:267) at com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:170) at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:882) at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:267) at com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:161) at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:875) at com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:267) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:264)
    at com.sun.proxy.$Proxy9.getListing(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:554)
    at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)




On 08/06/15 15:12, Ewan Leith wrote:
Try putting a * on the end of xmlDir, i.e.

xmlDir = fdfs:///abc/def/*

Rather than

xmlDir = Hdfs://abc/def

and see what happens. I don't know why, but that appears to be more reliable 
for me with S3 as the filesystem.

I'm also using binaryFiles, but I've tried running the same command while 
wholeTextFiles and had the same error.

Ewan

-----Original Message-----
From: Kostas Kougios [mailto:kostas.koug...@googlemail.com]
Sent: 08 June 2015 15:02
To: user@spark.apache.org
Subject: spark timesout maybe due to binaryFiles() with more than 1 million 
files in HDFS

I am reading millions of xml files via

val xmls = sc.binaryFiles(xmlDir)

The operation runs fine locally but on yarn it fails with:

  client token: N/A
  diagnostics: Application application_1433491939773_0012 failed 2 times due to 
ApplicationMaster for attempt appattempt_1433491939773_0012_000002 timed out. 
Failing the application.
  ApplicationMaster host: N/A
  ApplicationMaster RPC port: -1
  queue: default
  start time: 1433750951883
  final status: FAILED
  tracking URL:
http://controller01:8088/cluster/app/application_1433491939773_0012
  user: ariskk
Exception in thread "main" org.apache.spark.SparkException: Application 
finished with failed status at org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

On hadoops/userlogs logs I am frequently getting these messages:

15/06/08 09:15:38 WARN util.AkkaUtils: Error sending message [message = 
Heartbeat(1,[Lscala.Tuple2;@2b4f336b,BlockManagerId(1,
controller01.stratified, 58510))] in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)

I run my spark job via spark-submit and it works for an other HDFS directory 
that contains only 37k files. Any ideas how to resolve this?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-timesout-maybe-due-to-binaryFiles-with-more-than-1-million-files-in-HDFS-tp23208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


Reply via email to