Spark data incorrect when more than 200 tasks

2015-02-18 Thread lbierman
I'm fairly new to Spark. 
We have data in avro files on hdfs.
We are trying to load up all the avro files (28 gigs worth right now) and do
an aggregation.

When we have less than 200 tasks the data all runs and produces the proper
results. If there are more than 200 tasks (as stated in the logs by the
TaskSetManager) the data seems to only group when it reads in the RDD from
hdfs by the first record in the avro file. 

If I set: spark.shuffle.sort.bypassMergeThreshold greater than 200 data
seems to work. I don't understand why or how?

Here is the relevant code pieces:
JavaSparkContext context = new JavaSparkContext(
new SparkConf()
.setAppName(AnalyticsJob.class.getSimpleName())
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
);


context.hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive",
"true");

context.hadoopConfiguration().set("mapreduce.input.fileinputformat.inputdir",
job.inputDirectory);

JavaRDD events = ((JavaRDD>)
context.newAPIHadoopRDD(
context.hadoopConfiguration(),
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class
).keys())
.map(event -> event.datum())
.filter(key -> { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })
.mapToPair(event -> new Tuple2(event, 1))
.groupByKey()
.map(tuple -> tuple._1());

events.persist(StorageLevel.MEMORY_AND_DISK_2());

If I do a collect on events at this point the data is not as expected and
jumbled, so when we pass it onto the next job in our pipeline for
aggregation, the data doesn't come out as expected.

The downstream tasks maps to pairs again and stores in the db.

Thanks in advance for this help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-data-incorrect-when-more-than-200-tasks-tp21710.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Performance on Yarn

2015-02-19 Thread lbierman
I'm a bit new to Spark, but had a question on performance. I suspect a lot of
my issue is due to tuning and parameters. I have a Hive external table on
this data and to run queries against it runs in minutes 

The Job:
+ 40gb of avro events on HDFS (100 million+ avro events)
+ Read in the files from HDFS and dedupe events by key (mapToPair then a
reduceByKey)
+ RDD returned and persisted (disk and memory)
+ Then passed to a job that take the RDD and mapToPair of new object data
and then reduceByKey and foreachpartion do work

The issue:
When I run this on my environment on Yarn this takes 20+ hours. Running on
yarn we see the first stage runs to do build the RDD deduped, but then when
the next stage starts, things fail and data is lost. This results in stage 0
starting over and over and just dragging it out.

Errors I see in the driver logs:
ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X: remote
Akka client disassociated

15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1
(TID 1335,): FetchFailed(BlockManagerId(3, i, 33958), shuffleId=1,
mapId=162, reduceId=0, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect
toX/X:33958

Also we see this, but I'm suspecting this is because the previous stage
fails and the next one starts:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 1

Cluster:
5 machines, each 2 core , 8gb machines 

Spark-submit command:
 spark-submit --class com.myco.SparkJob \
--master yarn \
/tmp/sparkjob.jar \

Any thoughts or where to look or how to start approaching this problem or
more data points to present.

Thanks..

Code for the job:
 JavaRDD events = ((JavaRDD>)
context.newAPIHadoopRDD(
context.hadoopConfiguration(),
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class
).keys())
.map(event -> AnalyticsEvent.newBuilder(event.datum()).build())
.filter(key -> { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })
.mapToPair(event -> new Tuple2(event, 1)) 
.reduceByKey((analyticsEvent1, analyticsEvent2) -> analyticsEvent1)
.map(tuple -> tuple._1());

events.persist(StorageLevel.MEMORY_AND_DISK_2());
events.mapToPair(event -> {
return new Tuple2(
keySelector.select(event),
new RunningAggregates(
Optional.ofNullable(event.getVisitors()).orElse(0L),
Optional.ofNullable(event.getImpressions()).orElse(0L),
Optional.ofNullable(event.getAmount()).orElse(0.0D),
   
Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D)));
})
.reduceByKey((left, right) -> { return left.add(right); })
.foreachpartition(dostuff)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Performance on Yarn

2015-02-20 Thread lbierman
A bit more context on this issue. From the container logs on the executor 

Given my cluster specs above what would be appropriate parameters to pass
into :
--num-executors --num-cores --executor-memory 

I had tried it with --executor-memory 2500MB

015-02-20 06:50:09,056 WARN
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is
running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB
physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing
container.
Dump of the process-tree for container_1423083596644_0238_01_004160 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c
/usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m
-Xmx2400m 
-Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/CoarseGrainedScheduler
8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1>
/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout
2>
/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr
|- 23323 23320 23320 23320 (java) 922271 12263 461976 724218
/usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m
-Xmx2400m
-Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp
-Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Deduping events using Spark

2015-06-04 Thread lbierman
I'm still a bit new to Spark and am struggilng to figure out the best way to
Dedupe my events.

I load my Avro files from HDFS and then I want to dedupe events that have
the same nonce. 

For example my code so far:

 JavaRDD events = ((JavaRDD>)
context.newAPIHadoopRDD(
context.hadoopConfiguration(),
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class
).keys())
.map(event -> AnalyticsEvent.newBuilder(event.datum()).build())
.filter(key -> { return
Optional.ofNullable(key.getStepEventKey()).isPresent(); })

Now I want to get back an RDD of AnalyticsEvents that are unique. So I
basically want to do:
if AnalyticsEvent.getNonce() == AnalyticsEvent2.getNonce() only return 1 of
them.

I'm not sure how to do this? If I do reduceByKey it reduces by
AnalyticsEvent not by the values inside?

Any guidance would be much appreciated how I can walk this list of events
and only return a filtered version of unique nocnes.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Deduping-events-using-Spark-tp23153.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org