EMR: Use extra mounted EBS volumes for spark.local.dir

2017-10-08 Thread Tushar Sudake
Hello everyone,

I'm using 'r4.8xlarge' instances on EMR for my Spark Application.
To each node, I'm attaching one 512 GB EBS volume.

By logging in into nodes I tried verifying that this volume is being set
for 'spark.local.dir' by EMR automatically, but couldn't find any such
configuration.

Can someone please confirm this? Do I need to do it myself though bootstrap
steps?

Thanks,
Tushar


Re: Quick one... AWS SDK version?

2017-10-08 Thread Tushar Sudake
Hi Jonathan,

Does that mean Hadoop-AWS 2.7.3 too is built against AWS SDK 1.11.160 and
not 1.7.4?

Thanks.


On Oct 7, 2017 3:50 PM, "Jean Georges Perrin"  wrote:


Hey Marco,

I am actually reading from S3 and I use 2.7.3, but I inherited the project
and they use some AWS API from Amazon SDK, which version is like from
yesterday :) so it’s confused and AMZ is changing its version like crazy so
it’s a little difficult to follow. Right now I went back to 2.7.3 and SDK
1.7.4...

jg


On Oct 7, 2017, at 15:34, Marco Mistroni  wrote:

Hi JG
 out of curiosity what's ur usecase? are you writing to S3? you could use
Spark to do that , e.g using hadoop package  org.apache.hadoop:hadoop-aws:2.7.1
..that will download the aws client which is in line with hadoop 2.7.1?

hth
 marco

On Fri, Oct 6, 2017 at 10:58 PM, Jonathan Kelly 
wrote:

> Note: EMR builds Hadoop, Spark, et al, from source against specific
> versions of certain packages like the AWS Java SDK, httpclient/core,
> Jackson, etc., sometimes requiring some patches in these applications in
> order to work with versions of these dependencies that differ from what the
> applications may support upstream.
>
> For emr-5.8.0, we have built Hadoop and Spark (the Spark Kinesis
> connector, that is, since that's the only part of Spark that actually
> depends upon the AWS Java SDK directly) against AWS Java SDK 1.11.160
> instead of the much older version that vanilla Hadoop 2.7.3 would otherwise
> depend upon.
>
> ~ Jonathan
>
> On Wed, Oct 4, 2017 at 7:17 AM Steve Loughran 
> wrote:
>
>> On 3 Oct 2017, at 21:37, JG Perrin  wrote:
>>
>> Sorry Steve – I may not have been very clear: thinking about
>> aws-java-sdk-z.yy.xxx.jar. To the best of my knowledge, none is bundled
>> with Spark.
>>
>>
>>
>> I know, but if you are talking to s3 via the s3a client, you will need
>> the SDK version to match the hadoop-aws JAR of the same version of Hadoop
>> your JARs have. Similarly, if you were using spark-kinesis, it needs to be
>> in sync there.
>>
>>
>> *From:* Steve Loughran [mailto:ste...@hortonworks.com
>> ]
>> *Sent:* Tuesday, October 03, 2017 2:20 PM
>> *To:* JG Perrin 
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Quick one... AWS SDK version?
>>
>>
>>
>> On 3 Oct 2017, at 02:28, JG Perrin  wrote:
>>
>> Hey Sparkians,
>>
>> What version of AWS Java SDK do you use with Spark 2.2? Do you stick with
>> the Hadoop 2.7.3 libs?
>>
>>
>> You generally to have to stick with the version which hadoop was built
>> with I'm afraid...very brittle dependency.
>>
>>


How to merge fragmented IDs into one cluster if one/more IDs are shared

2017-10-05 Thread Tushar Sudake
Hello Sparkans,

I want to merge following cluster / set of IDs into one if they have shared
IDs.

For example:

uuid_3_1,uuid_3_2,uuid_3_3,uuid_3_4
uuid_3_2,uuid_3_5,uuid_3_6
uuid_3_5,uuid_3_7,uuid_3_8,uuid_3_9

into single:

uuid_3_1,uuid_3_2,uuid_3_3,uuid_3_4,uuid_3_5,uuid_3_6,uuid_3_7,uuid_3_8,uuid_3_9

because they're linked through 'uuid_3_2' and 'uuid_3_5'.

How can I do this in Spark?

One solution I can think of is to use Graphx. Keep adding links between two
IDs and Graphx will take care of creating clusters. But these are UUIDs and
Graphx only supports Long for VertexID. Also, my input data is huge (50 M
Unique IDs), so maintaining collision free map of UUID <-> Long will be
tough.

Any suggestions?

Thanks!


'Premature end of Content-Length' using S3A to read huge data

2017-08-21 Thread Tushar Sudake
Hello,

I'm writing a Spark based application which works around a pretty huge data
stored on s3. It's about **15 TB** in size uncompressed. Data is laid
across multiple small LZO compressed files files, varying from 10-100MB.

By default the job spawns 130k tasks while reading dataset and mapping it
to schema.

And then it fails around 70k tasks completions and after ~20 tasks failure.

**Exception:**

WARN lzo.LzopInputStream: IOException in getCompressedData; likely LZO
corruption.
org.apache.http.ConnectionClosedException: Premature end of
Content-Length delimited message body

Looks like the s3 connection is getting closed prematurely.

**I have tried nearly 40 different combos of configurations.**

**To summarize them:** 1 executor to 3 executors per node, 18GB to 42GB
`--executor-memory`, 3-5 `--executor-cores`, 1.8GB-4.0 GB
`spark.yarn.executor.memoryOverhead`, Both, Kryo and Default Java
serializers, 0.5 to 0.35 `spark.memory.storageFraction`, default, 13 to
20 partitions for bigger dataset. default, 200 to 2001
`spark.sql.shuffle.partitions`.

**And most importantly:** 100 to 2048 `fs.s3a.connection.maximum` property.

[This seems to be most relevant property to exception.]

[In all cases, driver was set to memory = 51GB, cores = 12,
`MEMORY_AND_DISK_SER` level for caching]

Nothing worked!

If I run the program with half of the bigger dataset size (7.5TB), it
finishes successfully in 1.5 hr.

1. What could I be doing wrong?
2. How do I determine the optimal value for `fs.s3a.connection.maximum`?
3. Is it possible that the s3 clients are getting GCed?

Any help will be appreciated!

***Environment:***

AWS EMR 5.7.0, 60 x i2.2xlarge SPOT Instances (16 vCPU, 61GB RAM, 2 x 800GB
SSD), Spark 2.1.0

YARN is used as resource manager.

***Code:***

It's a fairly simple job, doing something like this:

val sl = StorageLevel.MEMORY_AND_DISK_SER


sparkSession.sparkContext.hadoopConfiguration.set("io.compression.codecs",
"com.hadoop.compression.lzo.LzopCodec")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")

sparkSession.sparkContext.hadoopConfiguration.setInt("fs.s3a.connection.maximum",
1200)

val dataset_1: DataFrame = sparkSession
.read
.format("csv")
.option("delimiter", ",")
.schema()
.csv("s3a://...")
.select("ID") //15 TB

dataset_1.persist(sl)

print(dataset_1.count())

tmp = dataset_1.groupBy(“ID”).agg(count("*").alias("count_id”))
tmp2 = tmp.groupBy("count_id").agg(count("*").alias(“count_count_id”))
tmp2.write.csv(…)

dataset_1.unpersist()

***Full Stacktrace:***

17/08/21 20:02:36 INFO compress.CodecPool: Got brand-new decompressor
[.lzo]
17/08/21 20:06:18 WARN lzo.LzopInputStream: IOException in
getCompressedData; likely LZO corruption.
org.apache.http.ConnectionClosedException: Premature end of
Content-Length delimited message body (expected: 79627927; received:
19388396
at
org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:180)
at
org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.services.s3.model.S3ObjectInputStream.read(S3ObjectInputStream.java:155)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:151)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:108)
at
com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:72)
at
com.amazonaws.services.s3.model.S3ObjectInputStream.read(S3ObjectInputStream.java:155)
at
org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:160)
at java.io.DataInputStream.read(DataInputStream.java:149)
at
com.hadoop.compression.lzo.LzopInputStream.readFully(LzopInputStream.java:73)
at
com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:321)
at
com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at
org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
at
org.apache.hadoop.util.LineReader.readDefaultL