Hey Venkat,

This behavior seems reasonable. According to the table name, I guess here |DAgents| should be the fact table and |ContactDetails| is the dim table. Below is an explanation of a similar query, you may see |src| as |DAgents| and |src1| as |ContactDetails|.

|0: jdbc:hive2://localhost:10000> explain extended select * from src, src1 
where src.key = src1.key and src.key = 100;
+------------------------------------------------------------------------------------+
|                                        plan                                   
     |
+------------------------------------------------------------------------------------+
| == Parsed Logical Plan ==                                                     
     |
| 'Project [*]                                                                  
     |
|  'Filter (('src.key = 'src1.key) && ('src.key = 100))                         
     |
|   'Join Inner, None                                                           
     |
|    'UnresolvedRelation None, src, None                                        
     |
|    'UnresolvedRelation None, src1, None                                       
     |
|                                                                               
     |
| == Analyzed Logical Plan ==                                                   
     |
| Project [key#81,value#82,key#83,value#84]                                     
     |
|  Filter ((key#81 = key#83) && (key#81 = 100))                                 
     |
|   Join Inner, None                                                            
     |
|    MetastoreRelation default, src, None                                       
     |
|    MetastoreRelation default, src1, None                                      
     |
|                                                                               
     |
| == Optimized Logical Plan ==                                                  
     |
| Project [key#81,value#82,key#83,value#84]                                     
     |
|  Join Inner, Some((key#81 = key#83))                                          
     |
|   Filter (key#81 = 100)                                                       
     |
|    MetastoreRelation default, src, None                                       
     |
|   MetastoreRelation default, src1, None                                       
     |
|                                                                               
     |
| == Physical Plan ==                                                           
     |
| Project [key#81,value#82,key#83,value#84]                                     
     |
|  ShuffledHashJoin [key#81], [key#83], BuildRight                              
     |
|   Exchange (HashPartitioning [key#81], 200)                                   
     |
|    Filter (key#81 = 100)                                                      
     |
|     HiveTableScan [key#81,value#82], (MetastoreRelation default, src, None), 
None  |
|   Exchange (HashPartitioning [key#83], 200)                                   
     |
|    HiveTableScan [key#83,value#84], (MetastoreRelation default, src1, None), 
None  |
|                                                                               
     |
| Code Generation: false                                                        
     |
| == RDD ==                                                                     
     |
+------------------------------------------------------------------------------------+
|

Please notice the |Filter| node in the physical plan. In your case, all the filtered rows are shuffled into a single partition because |DAgents.f1| is both the predicate key and the shuffle key, and that partition is handled by the task that lasts for more than 1 second. All other tasks in the count stage cost only a few ms because they don’t receive any rows from |DAgents|.

If |ContactDetails| is small enought, you can cache |ContactDetails| first and set |spark.sql.autoBroadcastJoinShreshold| larger than the size of |ContactDetails|, a broadcast join rather than a would be performed, and would usually result better performance.

Cheng

On 12/2/14 6:35 AM, Venkat Subramanian wrote:

Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB
Ram each. Default serialization, Standalone, no security

Data was sqooped from relational DB to HDFS and Data is partitioned across
HDFS uniformly. I am reading a  fact table about 8 GB in size and one small
dim table from HDFS and then doing a join on them based on a criteria. .
Running the Driver on Spark shell on Spark master.

ContactDetail and DAgents are read as RDD and registered as table already.
Each of these tables have 60 to 90 fields and I am using Product class.

val CDJoinQry= sqlContext.sql("SELECT  * FROM ContactDetail, DAgents  WHERE
ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902")

CDJoinQry.map(ta => ta(4)).count   // result is a small number

This works fine and returns the result fine. Hadoop mapPartition reads and
creation of RDDs are all fine But in the Count stage, I see that one of
task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more)
and takes about 1.1 seconds to complete out of the 1.2 seconds of total
execution time. This task is usually around in the 3/4 th (say 160/200) of
the total tasks. At the time of that task running, one of the CPU in one
worker node goes to 100% for the duration of the task. Rest of the tasks
take few ms and does only < 5 MBs of Shuffle write.  I have run it
repeatedly and this happens regardless of which worker node this particular
task is running on. I turned on Spark debug on all nodes to understand, but
it was difficult to figure out where the delay is from the logs. There are
no errors or re-trys in the logs.

Not sure if I can post logs here for someone to look at, if so I can (about
10 Mb). Also, not sure if this normal in such a table join that one task
would take most amount of time. Let me know if you have any suggestions.

Regards,

Venkat




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Reply via email to