I wrote a query like below and I am trying to understand its query execution plan.
>>> spark.sql("select a.CustomerID,a.CustomerName,b.state from df a join df1 b on a.CustomerID=b.CustomerID").explain(mode="extended") == Parsed Logical Plan == 'Project ['a.CustomerID, 'a.CustomerName, 'b.state] +- 'Join Inner, ('a.CustomerID = 'b.CustomerID) :- 'SubqueryAlias a : +- 'UnresolvedRelation [df], [], false +- 'SubqueryAlias b +- 'UnresolvedRelation [df1], [], false == Analyzed Logical Plan == CustomerID: int, CustomerName: string, state: string Project [CustomerID#640, CustomerName#641, state#988] +- Join Inner, (CustomerID#640 = CustomerID#978) :- SubqueryAlias a : +- SubqueryAlias df : +- Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] csv +- SubqueryAlias b +- SubqueryAlias df1 +- Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] csv == Optimized Logical Plan == Project [CustomerID#640, CustomerName#641, state#988] +- Join Inner, (CustomerID#640 = CustomerID#978) :- Project [CustomerID#640, CustomerName#641] : +- Filter isnotnull(CustomerID#640) : +- Relation[CustomerID#640,CustomerName#641,Pincode#642,DOB#643,CompletedAddressWithPincode#644,ContactDetailsPhone1#645,ContactDetailsPhone2#646,ContactDetailsMobile#647,ContactDetailsEmail#648,City#649,State#650,ComplaintFlag#651,ProfileCompletenessPercentage#652,WhatsAppOptIn#653,HNINonHNI#654,S2SFlag#655] csv +- Project [CustomerID#978, State#988] +- Filter isnotnull(CustomerID#978) +- Relation[CustomerID#978,CustomerName#979,Pincode#980,DOB#981,CompletedAddressWithPincode#982,ContactDetailsPhone1#983,ContactDetailsPhone2#984,ContactDetailsMobile#985,ContactDetailsEmail#986,City#987,State#988,ComplaintFlag#989,ProfileCompletenessPercentage#990,WhatsAppOptIn#991,HNINonHNI#992,S2SFlag#993] csv == Physical Plan == *(5) Project [CustomerID#640, CustomerName#641, state#988] +- *(5) SortMergeJoin [CustomerID#640], [CustomerID#978], Inner :- *(2) Sort [CustomerID#640 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(CustomerID#640, 200), ENSURE_REQUIREMENTS, [id=#451] : +- *(1) Filter isnotnull(CustomerID#640) : +- FileScan csv [CustomerID#640,CustomerName#641] Batched: false, DataFilters: [isnotnull(CustomerID#640)], Format: CSV, Location: InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema: struct<CustomerID:int,CustomerName:string> +- *(4) Sort [CustomerID#978 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(CustomerID#978, 200), ENSURE_REQUIREMENTS, [id=#459] +- *(3) Filter isnotnull(CustomerID#978) +- FileScan csv [CustomerID#978,State#988] Batched: false, DataFilters: [isnotnull(CustomerID#978)], Format: CSV, Location: InMemoryFileIndex[file:/home/siddhesh/Documents/BAXA/csv_output/bulk_with_only_no], PartitionFilters: [], PushedFilters: [IsNotNull(CustomerID)], ReadSchema: struct<CustomerID:int,State:string> I know some of the features like Project is like select clause, filters is whatever filters we use in the query. Where can I look for the cost optimization in this plan? Suppose in future if my query is taking a longer time to be executed then by looking at this plan how can I figure what exactly is happening and what needs to be modified on the query part? Also internally since spark by default uses sort merge join as I can see from the plan but when does it opts for Sort-Merge Join and when does it opts for Shuffle-Hash Join? Thanks, Sid