I would suggest checking out disk IO on the nodes in your cluster and then
reading up on the limiting behaviors that accompany different kinds of EC2
storage.  Depending on how things are configured for your nodes, you may
have a local storage configuration that provides "bursty" IOPS where you
get apparently good performance at first and then limiting kicks in and
slows down the rate at which you can write data to local storage.


—
p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/

On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase <jon.ch...@gmail.com> wrote:

> I'm running a very simple Spark application that downloads files from S3,
> does a bit of mapping, then uploads new files.  Each file is roughly 2MB
> and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
> having any download speed issues (Amazon's EMR provides a custom
> implementation of the s3n:// file system, FWIW).
>
> When I say exceedingly slow, I mean that it takes about 2 minutes to
> download and process a 2MB file (this was taking ~2 seconds on the same
> instance types in Amazon's EMR).  When I download the same file from the
> EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
> other bandwidth checks for downloads from other external hosts - no speed
> problems there.
>
> Tried this w/Spark 1.1.0 and 1.1.1.
>
> When I do a thread dump on a worker, I typically see this a lot:
>
>
>
> "Executor task launch worker-7" daemon prio=10 tid=0x00007fd174039000
> nid=0x59e9 runnable [0x00007fd1f7dfb000]
>    java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:152)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
> at sun.security.ssl.InputRecord.read(InputRecord.java:480)
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
> - locked <0x00000007e44dd140> (a java.lang.Object)
> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
> - locked <0x00000007e44e1350> (a sun.security.ssl.AppInputStream)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
> - locked <0x00000007e44ea800> (a java.io.BufferedInputStream)
> at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
> at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
> at
> org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
> at
> org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
> at
> org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
> at
> org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
> at
> org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
> at
> org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
> at
> org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
> at
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
> at
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
> at
> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
> at
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
> at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
> at
> org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
> at
> org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
> at
> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
> at
> org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
> at
> org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
> at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> at org.apache.spark.scheduler.Task.run(Task.scala:54)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> Here's my pseudo code:
>
> Launch cluster:
> ./spark-ec2 -k x -i x --wait=480 -m m3.xlarge -t m3.xlarge -s 2
> --spot-price=.1 -r eu-west-1 --master-opts=-Dspark.eventLog.enabled=true
> launch spark-ec2
>
>
> Run job (from the master):
> ~/spark/bin/spark-submit --class com.MyClass --master
> spark://ec2-xx-xx-xx-xx:7077 --deploy-mode client --driver-memory 6G
> --executor-memory 2G --conf spark.eventLog.enabled=true ~/my.jar --in-path
> s3n://bucket/path/prefix-* --out-path s3n://bucket/outpath
>
>
> Job (Java pseudo):
>
>     rdd=   ctx.textFile(cmd.inPath)
>                  .map(return parser.parse(line))
>                 .filter(return targetDate.equals(logLine.timestamp))
>                 .keyBy(return p.partitionKeyFor(logLine))
>
>
>             FileOutputFormat.setCompressOutput(jobConf, true);
>             FileOutputFormat.setOutputCompressorClass(jobConf,
> GzipCodec.class);
>
>
>       rdd.saveAsHadoopDataset(jobConf);
>
>
> Unfortunately, I haven't been able to get debugging turned up - I'm using
> slf4j/logback w/the commons-logging and log4j bridges.  Any pointers for
> getting that turned up to DEBUG would be helpful too.
>
>
> I've tried everything I can think of and am at my wit's end - any
> troubleshooting suggestions would be greatly appreciated!
>
>
>

Reply via email to