Re: Spark SQL table Join, one task is taking long

2014-12-04 Thread Venkat Subramanian
Hi Cheng,

Thank you very much for taking your time and providing a detailed
explanation.
I tried a few things you suggested and some more things.

The ContactDetail table (8 GB) is the fact table and DAgents is the Dim
table (500 KB), reverse of what you are assuming, but your ideas still
apply.

I tried the following:

a) Cached the smaller Dim table to memory.
 sqlContext.setConf(spark.sql.autoBroadcastJoinShreshold, 1000)
 sqlContext.cacheTable(DAgents)

UI - Stage - Storage shows it to be cached in RDD when I run it.

val CDJoinQry= sqlContext.sql(SELECT  * FROM ContactDetail, DAgents  WHERE
ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902)

CDJoinQry.map(ta = ta(4)).count

I see no difference in terms of performance. It takes the same amount of
time for the query ~1.2 min

b)  I reversed both the order of tables and where clause in the query 

val CDJoinQry= sqlContext.sql(SELECT  * FROM DAgents, ContactDetail  WHERE
DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6)

The performance went  bad. It took 6-7 min to complete.

Just changing the order of table in Select for this join, keeping the same
where clause order, perf was similar (1.2-1.4 min).

c)  Using query in a), I tried to keep the storage in columnar fashion with 
sqlContext.setConf(spark.sql.inMemoryColumnarStorage.compressed, true)

I see no difference in terms of performance. It takes the same amount of
time for the query ~1.2 min.
Not sure if it even works.

d) I tried changing the comma separated HDFS files to Parquet format in HDFS
and reading it as parquet and then running query on it.

DAgents.saveAsParquetFile(DAgents.parquet)
FCDRDD.saveAsParquetFile(ContactDetail.parquet)


val DAgentsParquetRDD = sqlContext.parquetFile(DAgents.parquet)
DAgentsParquetRDD.registerAsTable(DAgentsParquet)

val FContactDetailParquetRDD =
sqlContext.parquetFile(ContactDetail.parquet)
FContactDetailParquetRDD.registerAsTable(ContactDetailParquet)

val CDJoinQryParquet= sqlContext.sql(SELECT  * FROM ContactDetailParquet,
DAgentsParquet  WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and
DAgentsParquet.f1 = 902)
CDJoinQryParquet.map(ta = ta(4)).count

*The query time is actually more for this join query.* It ended up taking
3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse
than non parquet for this join.

I changed the query where table order and where clause was reversed and ran
it for parquet

val CDJoinQryParquetReversed= sqlContext.sql(SELECT  * FROM DAgentsParquet,
ContactDetailParquet  WHERE   DAgentsParquet.f1 = 902 and
DAgentsParquet.f1=ContactDetailParquet.f6 )
CDJoinQryParquetReversed.map(ta = ta(4)).count

it took  18 min and had to kill it as it kept on running.

*But queries where there is no join, Parquet's performance was extremely
good.*
For example, this query below where there is no join, ran in 8 seconds,
whereas the same query in non parquet  took 30 seconds.
val CDJoinQryParquet0= sqlContext.sql(SELECT  * FROM ContactDetailParquet
WHERE ContactDetailParquet.f6 = 902)
CDJoinQryParquet0.map(ta = ta(4)).count

*Some potential conclusions (pl. comment) :*
* Order in where clause seems to matter in Spark SQL optimizer. In
relational DBs  that I have worked with, when I noticed, order of where
clause is typically a hint . Would be nice of Spark SQL optimizer is fixed
to ignore order of clauses and optimize it automatically.
* I tried changing just the table order  in Select statement for a join and
it also seems to matter when reading data from HDFS (for parquet and to a
less extent for non parquet in my case) even when the where clause order is
same. Would be nice of SQL optimizer  optimizes it automatically.
* Table joins for huge table(s) are costly. Fact and Dimension concepts from
star schema don't translate well to Big Data (Hadoop, Spark). It may be
better to de-normalize and store huge tables to avoid Joins. Joins seem to
be evil. (Have tried de-normalizing when using Cassandra, but that has its
own problem of resulting in full table scan when running ad-hoc queries when
the keys are not known)

Regards,

Venkat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL table Join, one task is taking long

2014-12-03 Thread Cheng Lian
 is a small number

This works fine and returns the result fine. Hadoop mapPartition reads and
creation of RDDs are all fine But in the Count stage, I see that one of
task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more)
and takes about 1.1 seconds to complete out of the 1.2 seconds of total
execution time. This task is usually around in the 3/4 th (say 160/200) of
the total tasks. At the time of that task running, one of the CPU in one
worker node goes to 100% for the duration of the task. Rest of the tasks
take few ms and does only  5 MBs of Shuffle write.  I have run it
repeatedly and this happens regardless of which worker node this particular
task is running on. I turned on Spark debug on all nodes to understand, but
it was difficult to figure out where the delay is from the logs. There are
no errors or re-trys in the logs.

Not sure if I can post logs here for someone to look at, if so I can (about
10 Mb). Also, not sure if this normal in such a table join that one task
would take most amount of time. Let me know if you have any suggestions.

Regards,

Venkat




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



​


Re: Spark SQL table Join, one task is taking long

2014-12-02 Thread Venkat Subramanian
Bump up. 
Michael Armbrust, anybody from Spark SQL team?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20218.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL table Join, one task is taking long

2014-12-01 Thread Venkat Subramanian
Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB
Ram each. Default serialization, Standalone, no security

Data was sqooped from relational DB to HDFS and Data is partitioned across
HDFS uniformly. I am reading a  fact table about 8 GB in size and one small
dim table from HDFS and then doing a join on them based on a criteria. .
Running the Driver on Spark shell on Spark master.

ContactDetail and DAgents are read as RDD and registered as table already.
Each of these tables have 60 to 90 fields and I am using Product class.

val CDJoinQry= sqlContext.sql(SELECT  * FROM ContactDetail, DAgents  WHERE
ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902)

CDJoinQry.map(ta = ta(4)).count   // result is a small number

This works fine and returns the result fine. Hadoop mapPartition reads and
creation of RDDs are all fine But in the Count stage, I see that one of 
task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more)
and takes about 1.1 seconds to complete out of the 1.2 seconds of total
execution time. This task is usually around in the 3/4 th (say 160/200) of
the total tasks. At the time of that task running, one of the CPU in one
worker node goes to 100% for the duration of the task. Rest of the tasks
take few ms and does only  5 MBs of Shuffle write.  I have run it
repeatedly and this happens regardless of which worker node this particular
task is running on. I turned on Spark debug on all nodes to understand, but
it was difficult to figure out where the delay is from the logs. There are
no errors or re-trys in the logs. 

Not sure if I can post logs here for someone to look at, if so I can (about
10 Mb). Also, not sure if this normal in such a table join that one task
would take most amount of time. Let me know if you have any suggestions.

Regards,

Venkat




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org