Hi, I have been having problems processing a 3.4TB data set - uncompressed tab separated text - containing object creation/update events from our system, one event per line.
I decided to see what happens with a count of the number of events (= number of lines in the text files) and a count of the number of distinct object ids, which I thought should be straightforward enough to succeed. The job stalled at the end of the first stage (55657 tasks, albeit 1 failed but I've seen processing continue to the next stage despite small numbers of failures) despite only generating a 5.3GB shuffle. It ran for 2.5 hours and is now sitting apparently doing nothing. Does this suggest something is wrong with the cluster? Computing either event count should be straightforward despite the size of the data set, or am I missing something? The set up is a spark-ec2 generated cluster (trying EMR will be my next move, along with bucketing the data via parquet) running Spark 1.5.2, openjdk 8 (this is a scala job though, but others are java), r3.2xlarge instance types, 5 slaves each with 500GB EBS volumes which SPARK_LOCAL_DIRS points to. The code is: val sc = new SparkContext(conf); > try { > val rawSchema = StructType(Array( > StructField("objectId", DataTypes.StringType, true), > StructField("eventName", DataTypes.StringType, true), > StructField("eventJson", DataTypes.StringType, true), > StructField("timestampNanos", DataTypes.StringType, true))) > val sqlContext = new SQLContext(sc) > val df = sqlContext.read > .format("com.databricks.spark.csv") > .option("header", "false") > .option("delimiter", "\t") > .schema(rawSchema) > .load(inputPath) > val oids = df.select("objectId") > val distinct = oids.distinct.count > val events = oids.count > println("Number of objectIds: " + distinct); > println("Number of events: " + events); > println("Elapsed time: " + (System.currentTimeMillis() - > startMillis)/1000 + "s") Here's the plan as revealed by the SQL part of the UI: == Parsed Logical Plan == > Aggregate [count(1) AS count#4L] > Aggregate [objectId#0], [objectId#0] > Project [objectId#0] > Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] > CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false, > > ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), > StructField(eventName,StringType,true), > StructField(eventJson,StringType,true), > StructField(timestampNanos,StringType,true)),false,null) > > == Analyzed Logical Plan == > count: bigint > Aggregate [count(1) AS count#4L] > Aggregate [objectId#0], [objectId#0] > Project [objectId#0] > Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] > CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false, > > ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), > StructField(eventName,StringType,true), > StructField(eventJson,StringType,true), > StructField(timestampNanos,StringType,true)),false,null) > > == Optimized Logical Plan == > Aggregate [count(1) AS count#4L] > Aggregate [objectId#0] > Project [objectId#0] > Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] > CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false, > > ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), > StructField(eventName,StringType,true), > StructField(eventJson,StringType,true), > StructField(timestampNanos,StringType,true)),false,null) > > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#4L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#7L]) > TungstenAggregate(key=[objectId#0], functions=[], output=[]) > TungstenExchange hashpartitioning(objectId#0) > TungstenAggregate(key=[objectId#0], functions=[], output=[objectId#0]) > Scan > CsvRelation(<function0>,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false, > > ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true), > StructField(eventName,StringType,true), > StructField(eventJson,StringType,true), > StructField(timestampNanos,StringType,true)),false,null)[objectId#0] > > Code Generation: true > > Regards, James