Turns out I was using the s3:// prefix (in a standalone Spark cluster).  It
was writing a LOT of block_* files to my S3 bucket, which was the cause for
the slowness.  I was coming from Amazon EMR, where Amazon's underlying FS
implementation has re-mapped s3:// to s3n://, which doesn't use the block_*
files.

On Sat, Dec 20, 2014 at 8:17 PM, Paul Brown <p...@mult.ifario.us> wrote:

>
> I would suggest checking out disk IO on the nodes in your cluster and then
> reading up on the limiting behaviors that accompany different kinds of EC2
> storage.  Depending on how things are configured for your nodes, you may
> have a local storage configuration that provides "bursty" IOPS where you
> get apparently good performance at first and then limiting kicks in and
> slows down the rate at which you can write data to local storage.
>
>
> --
> p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
>
> On Thu, Dec 18, 2014 at 5:56 AM, Jon Chase <jon.ch...@gmail.com> wrote:
>
>> I'm running a very simple Spark application that downloads files from S3,
>> does a bit of mapping, then uploads new files.  Each file is roughly 2MB
>> and is gzip'd.  I was running the same code on Amazon's EMR w/Spark and not
>> having any download speed issues (Amazon's EMR provides a custom
>> implementation of the s3n:// file system, FWIW).
>>
>> When I say exceedingly slow, I mean that it takes about 2 minutes to
>> download and process a 2MB file (this was taking ~2 seconds on the same
>> instance types in Amazon's EMR).  When I download the same file from the
>> EC2 machine with wget or curl, it downloads in ~ 1 second.  I've also done
>> other bandwidth checks for downloads from other external hosts - no speed
>> problems there.
>>
>> Tried this w/Spark 1.1.0 and 1.1.1.
>>
>> When I do a thread dump on a worker, I typically see this a lot:
>>
>>
>>
>> "Executor task launch worker-7" daemon prio=10 tid=0x00007fd174039000
>> nid=0x59e9 runnable [0x00007fd1f7dfb000]
>>    java.lang.Thread.State: RUNNABLE
>> at java.net.SocketInputStream.socketRead0(Native Method)
>> at java.net.SocketInputStream.read(SocketInputStream.java:152)
>> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>> at sun.security.ssl.InputRecord.read(InputRecord.java:480)
>> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>> - locked <0x00000007e44dd140> (a java.lang.Object)
>> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)
>> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>> - locked <0x00000007e44e1350> (a sun.security.ssl.AppInputStream)
>> at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
>> at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
>> - locked <0x00000007e44ea800> (a java.io.BufferedInputStream)
>> at
>> org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
>> at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
>> at
>> org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
>> at
>> org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
>> at
>> org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
>> at
>> org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
>> at
>> org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
>> at
>> org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
>> at
>> org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
>> at
>> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
>> at
>> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
>> at
>> org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
>> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
>> at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
>> at
>> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
>> at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
>> at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
>> at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source)
>> at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330)
>> at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432)
>> at
>> org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425)
>> at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126)
>> at
>> org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256)
>> at
>> org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
>> at
>> org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126)
>> at
>> org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44)
>> at
>> org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99)
>> at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986)
>> at
>> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>> at org.apache.spark.scheduler.Task.run(Task.scala:54)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>> Here's my pseudo code:
>>
>> Launch cluster:
>> ./spark-ec2 -k x -i x --wait=480 -m m3.xlarge -t m3.xlarge -s 2
>> --spot-price=.1 -r eu-west-1 --master-opts=-Dspark.eventLog.enabled=true
>> launch spark-ec2
>>
>>
>> Run job (from the master):
>> ~/spark/bin/spark-submit --class com.MyClass --master
>> spark://ec2-xx-xx-xx-xx:7077 --deploy-mode client --driver-memory 6G
>> --executor-memory 2G --conf spark.eventLog.enabled=true ~/my.jar --in-path
>> s3n://bucket/path/prefix-* --out-path s3n://bucket/outpath
>>
>>
>> Job (Java pseudo):
>>
>>     rdd=   ctx.textFile(cmd.inPath)
>>                  .map(return parser.parse(line))
>>                 .filter(return targetDate.equals(logLine.timestamp))
>>                 .keyBy(return p.partitionKeyFor(logLine))
>>
>>
>>             FileOutputFormat.setCompressOutput(jobConf, true);
>>             FileOutputFormat.setOutputCompressorClass(jobConf,
>> GzipCodec.class);
>>
>>
>>       rdd.saveAsHadoopDataset(jobConf);
>>
>>
>> Unfortunately, I haven't been able to get debugging turned up - I'm using
>> slf4j/logback w/the commons-logging and log4j bridges.  Any pointers for
>> getting that turned up to DEBUG would be helpful too.
>>
>>
>> I've tried everything I can think of and am at my wit's end - any
>> troubleshooting suggestions would be greatly appreciated!
>>
>>
>>
>

Reply via email to