We want to extract data from mysql, and calculate in sparksql. The sql explain like below.
== Parsed Logical Plan == > 'Sort ['revenue DESC NULLS LAST], true > +- 'Aggregate ['n_name], ['n_name, 'SUM(('l_extendedprice * (1 - > 'l_discount))) AS revenue#329] > +- 'Filter ((((('c_custkey = 'o_custkey) && ('l_orderkey = > 'o_orderkey)) && ('l_suppkey = 's_suppkey)) && (('c_nationkey = > 's_nationkey) && ('s_nationkey = 'n_nationkey))) && ((('n_regionkey = > 'r_regionkey) && ('r_name = AFRICA)) && (('o_orderdate >= 1993-01-01) && > ('o_orderdate < 1994-01-01)))) > +- 'Join Inner > :- 'Join Inner > : :- 'Join Inner > : : :- 'Join Inner > : : : :- 'Join Inner > : : : : :- 'UnresolvedRelation `customer` > : : : : +- 'UnresolvedRelation `orders` > : : : +- 'UnresolvedRelation `lineitem` > : : +- 'UnresolvedRelation `supplier` > : +- 'UnresolvedRelation `nation` > +- 'UnresolvedRelation `region` > == Analyzed Logical Plan == > n_name: string, revenue: decimal(38,4) > Sort [revenue#329 DESC NULLS LAST], true > +- Aggregate [n_name#176], [n_name#176, > sum(CheckOverflow((promote_precision(cast(l_extendedprice#68 as > decimal(16,2))) * > promote_precision(cast(CheckOverflow((promote_precision(cast(cast(1 as > decimal(1,0)) as decimal(16,2))) - promote_precision(cast(l_discount#69 as > decimal(16,2)))), DecimalType(16,2)) as decimal(16,2)))), > DecimalType(32,4))) AS revenue#329] > +- Filter (((((c_custkey#273 = o_custkey#1) && (l_orderkey#63 = > o_orderkey#0)) && (l_suppkey#65 = s_suppkey#224)) && ((c_nationkey#276 = > s_nationkey#227) && (s_nationkey#227 = n_nationkey#175))) && > (((n_regionkey#177 = r_regionkey#203) && (r_name#204 = AFRICA)) && > ((cast(o_orderdate#4 as string) >= 1993-01-01) && (cast(o_orderdate#4 as > string) < 1994-01-01)))) > +- Join Inner > :- Join Inner > : :- Join Inner > : : :- Join Inner > : : : :- Join Inner > : : : : :- SubqueryAlias customer > : : : : : +- > Relation[C_CUSTKEY#273,C_NAME#274,C_ADDRESS#275,C_NATIONKEY#276,C_PHONE#277,C_ACCTBAL#278,C_MKTSEGMENT#279,C_COMMENT#280] > JDBCRelation(customer) [numPartitions=1] > : : : : +- SubqueryAlias orders > : : : : +- > Relation[O_ORDERKEY#0,O_CUSTKEY#1,O_ORDERSTATUS#2,O_TOTALPRICE#3,O_ORDERDATE#4,O_ORDERPRIORITY#5,O_CLERK#6,O_SHIPPRIORITY#7,O_COMMENT#8] > JDBCRelation(orders) [numPartitions=1] > : : : +- SubqueryAlias lineitem > : : : +- > Relation[L_ORDERKEY#63,L_PARTKEY#64,L_SUPPKEY#65,L_LINENUMBER#66,L_QUANTITY#67,L_EXTENDEDPRICE#68,L_DISCOUNT#69,L_TAX#70,L_RETURNFLAG#71,L_LINESTATUS#72,L_SHIPDATE#73,L_COMMITDATE#74,L_RECEIPTDATE#75,L_SHIPINSTRUCT#76,L_SHIPMODE#77,L_COMMENT#78] > JDBCRelation(lineitem) [numPartitions=1] > : : +- SubqueryAlias supplier > : : +- > Relation[S_SUPPKEY#224,S_NAME#225,S_ADDRESS#226,S_NATIONKEY#227,S_PHONE#228,S_ACCTBAL#229,S_COMMENT#230] > JDBCRelation(supplier) [numPartitions=1] > : +- SubqueryAlias nation > : +- > Relation[N_NATIONKEY#175,N_NAME#176,N_REGIONKEY#177,N_COMMENT#178] > JDBCRelation(nation) [numPartitions=1] > +- SubqueryAlias region > +- Relation[R_REGIONKEY#203,R_NAME#204,R_COMMENT#205] > JDBCRelation(region) [numPartitions=1] > == Optimized Logical Plan == > Sort [revenue#329 DESC NULLS LAST], true > +- Aggregate [n_name#176], [n_name#176, > sum(CheckOverflow((promote_precision(cast(l_extendedprice#68 as > decimal(16,2))) * promote_precision(CheckOverflow((1.00 - > promote_precision(cast(l_discount#69 as decimal(16,2)))), > DecimalType(16,2)))), DecimalType(32,4))) AS revenue#329] > +- Project [L_EXTENDEDPRICE#68, L_DISCOUNT#69, N_NAME#176] > +- Join Inner, (n_regionkey#177 = r_regionkey#203) > :- Project [L_EXTENDEDPRICE#68, L_DISCOUNT#69, N_NAME#176, > N_REGIONKEY#177] > : +- Join Inner, (s_nationkey#227 = n_nationkey#175) > : :- Project [L_EXTENDEDPRICE#68, L_DISCOUNT#69, > S_NATIONKEY#227] > : : +- Join Inner, ((l_suppkey#65 = s_suppkey#224) && > (c_nationkey#276 = s_nationkey#227)) > : : :- Project [C_NATIONKEY#276, L_SUPPKEY#65, > L_EXTENDEDPRICE#68, L_DISCOUNT#69] > : : : +- Join Inner, (l_orderkey#63 = o_orderkey#0) > : : : :- Project [C_NATIONKEY#276, O_ORDERKEY#0] > : : : : +- Join Inner, (c_custkey#273 = o_custkey#1) > : : : : :- Project [C_CUSTKEY#273, > C_NATIONKEY#276] > : : : : : +- Filter (isnotnull(c_custkey#273) && > isnotnull(c_nationkey#276)) > : : : : : +- InMemoryRelation [C_CUSTKEY#273, > C_NAME#274, C_ADDRESS#275, C_NATIONKEY#276, C_PHONE#277, C_ACCTBAL#278, > C_MKTSEGMENT#279, C_COMMENT#280], true, 10000, StorageLevel(disk, memory, 1 > replicas) > : : : : : +- *(1) Scan > JDBCRelation(customer) [numPartitions=1] > [C_CUSTKEY#273,C_NAME#274,C_ADDRESS#275,C_NATIONKEY#276,C_PHONE#277,C_ACCTBAL#278,C_MKTSEGMENT#279,C_COMMENT#280] > PushedFilters: [], ReadSchema: > struct<C_CUSTKEY:int,C_NAME:string,C_ADDRESS:string,C_NATIONKEY:int,C_PHONE:string,C_ACCTBAL:deci... > : : : : +- Project [O_ORDERKEY#0, O_CUSTKEY#1] > : : : : +- Filter ((((isnotnull(o_orderdate#4) > && (cast(o_orderdate#4 as string) >= 1993-01-01)) && (cast(o_orderdate#4 as > string) < 1994-01-01)) && isnotnull(o_custkey#1)) && > isnotnull(o_orderkey#0)) > : : : : +- InMemoryRelation [O_ORDERKEY#0, > O_CUSTKEY#1, O_ORDERSTATUS#2, O_TOTALPRICE#3, O_ORDERDATE#4, > O_ORDERPRIORITY#5, O_CLERK#6, O_SHIPPRIORITY#7, O_COMMENT#8], true, 10000, > StorageLevel(disk, memory, 1 replicas) > : : : : +- *(1) Scan > JDBCRelation(orders) [numPartitions=1] > [O_ORDERKEY#0,O_CUSTKEY#1,O_ORDERSTATUS#2,O_TOTALPRICE#3,O_ORDERDATE#4,O_ORDERPRIORITY#5,O_CLERK#6,O_SHIPPRIORITY#7,O_COMMENT#8] > PushedFilters: [], ReadSchema: > struct<O_ORDERKEY:int,O_CUSTKEY:int,O_ORDERSTATUS:string,O_TOTALPRICE:decimal(15,2),O_ORDERDATE:d... > : : : +- Project [L_ORDERKEY#63, L_SUPPKEY#65, > L_EXTENDEDPRICE#68, L_DISCOUNT#69] > : : : +- Filter (isnotnull(l_orderkey#63) && > isnotnull(l_suppkey#65)) > : : : +- InMemoryRelation [L_ORDERKEY#63, > L_PARTKEY#64, L_SUPPKEY#65, L_LINENUMBER#66, L_QUANTITY#67, > L_EXTENDEDPRICE#68, L_DISCOUNT#69, L_TAX#70, L_RETURNFLAG#71, > L_LINESTATUS#72, L_SHIPDATE#73, L_COMMITDATE#74, L_RECEIPTDATE#75, > L_SHIPINSTRUCT#76, L_SHIPMODE#77, L_COMMENT#78], true, 10000, > StorageLevel(disk, memory, 1 replicas) > : : : +- *(1) Scan JDBCRelation(lineitem) > [numPartitions=1] > [L_ORDERKEY#63,L_PARTKEY#64,L_SUPPKEY#65,L_LINENUMBER#66,L_QUANTITY#67,L_EXTENDEDPRICE#68,L_DISCOUNT#69,L_TAX#70,L_RETURNFLAG#71,L_LINESTATUS#72,L_SHIPDATE#73,L_COMMITDATE#74,L_RECEIPTDATE#75,L_SHIPINSTRUCT#76,L_SHIPMODE#77,L_COMMENT#78] > PushedFilters: [], ReadSchema: > struct<L_ORDERKEY:int,L_PARTKEY:int,L_SUPPKEY:int,L_LINENUMBER:int,L_QUANTITY:decimal(15,2),L_EXT... > : : +- Project [S_SUPPKEY#224, S_NATIONKEY#227] > : : +- Filter (isnotnull(s_suppkey#224) && > isnotnull(s_nationkey#227)) > : : +- InMemoryRelation [S_SUPPKEY#224, S_NAME#225, > S_ADDRESS#226, S_NATIONKEY#227, S_PHONE#228, S_ACCTBAL#229, S_COMMENT#230], > true, 10000, StorageLevel(disk, memory, 1 replicas) > : : +- *(1) Scan JDBCRelation(supplier) > [numPartitions=1] > [S_SUPPKEY#224,S_NAME#225,S_ADDRESS#226,S_NATIONKEY#227,S_PHONE#228,S_ACCTBAL#229,S_COMMENT#230] > PushedFilters: [], ReadSchema: > struct<S_SUPPKEY:int,S_NAME:string,S_ADDRESS:string,S_NATIONKEY:int,S_PHONE:string,S_ACCTBAL:deci... > : +- Project [N_NATIONKEY#175, N_NAME#176, N_REGIONKEY#177] > : +- Filter (isnotnull(n_nationkey#175) && > isnotnull(n_regionkey#177)) > : +- InMemoryRelation [N_NATIONKEY#175, N_NAME#176, > N_REGIONKEY#177, N_COMMENT#178], true, 10000, StorageLevel(disk, memory, 1 > replicas) > : +- *(1) Scan JDBCRelation(nation) > [numPartitions=1] > [N_NATIONKEY#175,N_NAME#176,N_REGIONKEY#177,N_COMMENT#178] PushedFilters: > [], ReadSchema: > struct<N_NATIONKEY:int,N_NAME:string,N_REGIONKEY:int,N_COMMENT:string> > +- Project [R_REGIONKEY#203] > +- Filter ((isnotnull(r_name#204) && (r_name#204 = AFRICA)) && > isnotnull(r_regionkey#203)) > +- InMemoryRelation [R_REGIONKEY#203, R_NAME#204, > R_COMMENT#205], true, 10000, StorageLevel(disk, memory, 1 replicas) > +- *(1) Scan JDBCRelation(region) [numPartitions=1] > [R_REGIONKEY#203,R_NAME#204,R_COMMENT#205] PushedFilters: [], ReadSchema: > struct<R_REGIONKEY:int,R_NAME:string,R_COMMENT:string> > == Physical Plan == > *(23) Sort [revenue#329 DESC NULLS LAST], true, 0 > +- Exchange rangepartitioning(revenue#329 DESC NULLS LAST, 200) > +- *(22) HashAggregate(keys=[n_name#176], > functions=[sum(CheckOverflow((promote_precision(cast(l_extendedprice#68 as > decimal(16,2))) * promote_precision(CheckOverflow((1.00 - > promote_precision(cast(l_discount#69 as decimal(16,2)))), > DecimalType(16,2)))), DecimalType(32,4)))], output=[n_name#176, > revenue#329]) > +- Exchange(coordinator id: 1149892111) hashpartitioning(n_name#176, > 200), coordinator[target post-shuffle partition size: 67108864] > +- *(21) HashAggregate(keys=[n_name#176], > functions=[partial_sum(CheckOverflow((promote_precision(cast(l_extendedprice#68 > as decimal(16,2))) * promote_precision(CheckOverflow((1.00 - > promote_precision(cast(l_discount#69 as decimal(16,2)))), > DecimalType(16,2)))), DecimalType(32,4)))], output=[n_name#176, sum#574]) > +- *(21) Project [L_EXTENDEDPRICE#68, L_DISCOUNT#69, > N_NAME#176] > +- *(21) SortMergeJoin [n_regionkey#177], > [r_regionkey#203], Inner > :- *(18) Sort [n_regionkey#177 ASC NULLS FIRST], false, 0 > : +- Exchange(coordinator id: 266374831) > hashpartitioning(n_regionkey#177, 200), coordinator[target post-shuffle > partition size: 67108864] > : +- *(17) Project [L_EXTENDEDPRICE#68, > L_DISCOUNT#69, N_NAME#176, N_REGIONKEY#177] > : +- *(17) SortMergeJoin [s_nationkey#227], > [n_nationkey#175], Inner > : :- *(14) Sort [s_nationkey#227 ASC NULLS > FIRST], false, 0 > : : +- Exchange(coordinator id: 1876548582) > hashpartitioning(s_nationkey#227, 200), coordinator[target post-shuffle > partition size: 67108864] > : : +- *(13) Project [L_EXTENDEDPRICE#68, > L_DISCOUNT#69, S_NATIONKEY#227] > : : +- *(13) SortMergeJoin > [l_suppkey#65, c_nationkey#276], [s_suppkey#224, s_nationkey#227], Inner > : : :- *(10) Sort [l_suppkey#65 ASC > NULLS FIRST, c_nationkey#276 ASC NULLS FIRST], false, 0 > : : : +- Exchange(coordinator id: > 2066777507) hashpartitioning(l_suppkey#65, c_nationkey#276, 200), > coordinator[target post-shuffle partition size: 67108864] > : : : +- *(9) Project > [C_NATIONKEY#276, L_SUPPKEY#65, L_EXTENDEDPRICE#68, L_DISCOUNT#69] > : : : +- *(9) SortMergeJoin > [o_orderkey#0], [l_orderkey#63], Inner > : : : :- *(6) Sort > [o_orderkey#0 ASC NULLS FIRST], false, 0 > : : : : +- > Exchange(coordinator id: 1879190852) hashpartitioning(o_orderkey#0, 200), > coordinator[target post-shuffle partition size: 67108864] > : : : : +- *(5) > Project [C_NATIONKEY#276, O_ORDERKEY#0] > : : : : +- *(5) > SortMergeJoin [c_custkey#273], [o_custkey#1], Inner > : : : : :- *(2) > Sort [c_custkey#273 ASC NULLS FIRST], false, 0 > : : : : : +- > Exchange(coordinator id: 2065453085) hashpartitioning(c_custkey#273, 200), > coordinator[target post-shuffle partition size: 67108864] > : : : : : +- > *(1) Filter (isnotnull(c_custkey#273) && isnotnull(c_nationkey#276)) > : : : : : > +- InMemoryTableScan [C_CUSTKEY#273, C_NATIONKEY#276], > [isnotnull(c_custkey#273), isnotnull(c_nationkey#276)] > : : : : : > +- InMemoryRelation [C_CUSTKEY#273, C_NAME#274, C_ADDRESS#275, > C_NATIONKEY#276, C_PHONE#277, C_ACCTBAL#278, C_MKTSEGMENT#279, > C_COMMENT#280], true, 10000, StorageLevel(disk, memory, 1 replicas) > : : : : : > +- *(1) Scan JDBCRelation(customer) [numPartitions=1] > [C_CUSTKEY#273,C_NAME#274,C_ADDRESS#275,C_NATIONKEY#276,C_PHONE#277,C_ACCTBAL#278,C_MKTSEGMENT#279,C_COMMENT#280] > PushedFilters: [], ReadSchema: > struct<C_CUSTKEY:int,C_NAME:string,C_ADDRESS:string,C_NATIONKEY:int,C_PHONE:string,C_ACCTBAL:deci... > : : : : +- *(4) > Sort [o_custkey#1 ASC NULLS FIRST], false, 0 > : : : : +- > Exchange(coordinator id: 2065453085) hashpartitioning(o_custkey#1, 200), > coordinator[target post-shuffle partition size: 67108864] > : : : : +- > *(3) Project [O_ORDERKEY#0, O_CUSTKEY#1] > : : : : > +- *(3) Filter ((((isnotnull(o_orderdate#4) && (cast(o_orderdate#4 as > string) >= 1993-01-01)) && (cast(o_orderdate#4 as string) < 1994-01-01)) && > isnotnull(o_custkey#1)) && isnotnull(o_orderkey#0)) > : : : : > +- InMemoryTableScan [O_CUSTKEY#1, O_ORDERKEY#0, o_orderdate#4], > [isnotnull(o_orderdate#4), (cast(o_orderdate#4 as string) >= 1993-01-01), > (cast(o_orderdate#4 as string) < 1994-01-01), isnotnull(o_custkey#1), > isnotnull(o_orderkey#0)] > : : : : > +- InMemoryRelation [O_ORDERKEY#0, O_CUSTKEY#1, O_ORDERSTATUS#2, > O_TOTALPRICE#3, O_ORDERDATE#4, O_ORDERPRIORITY#5, O_CLERK#6, > O_SHIPPRIORITY#7, O_COMMENT#8], true, 10000, StorageLevel(disk, memory, 1 > replicas) > : : : : > +- *(1) Scan JDBCRelation(orders) [numPartitions=1] > [O_ORDERKEY#0,O_CUSTKEY#1,O_ORDERSTATUS#2,O_TOTALPRICE#3,O_ORDERDATE#4,O_ORDERPRIORITY#5,O_CLERK#6,O_SHIPPRIORITY#7,O_COMMENT#8] > PushedFilters: [], ReadSchema: > struct<O_ORDERKEY:int,O_CUSTKEY:int,O_ORDERSTATUS:string,O_TOTALPRICE:decimal(15,2),O_ORDERDATE:d... > : : : +- *(8) Sort > [l_orderkey#63 ASC NULLS FIRST], false, 0 > : : : +- > Exchange(coordinator id: 1879190852) hashpartitioning(l_orderkey#63, 200), > coordinator[target post-shuffle partition size: 67108864] > : : : +- *(7) Filter > (isnotnull(l_orderkey#63) && isnotnull(l_suppkey#65)) > : : : +- > InMemoryTableScan [L_ORDERKEY#63, L_SUPPKEY#65, L_EXTENDEDPRICE#68, > L_DISCOUNT#69], [isnotnull(l_orderkey#63), isnotnull(l_suppkey#65)] > : : : +- > InMemoryRelation [L_ORDERKEY#63, L_PARTKEY#64, L_SUPPKEY#65, > L_LINENUMBER#66, L_QUANTITY#67, L_EXTENDEDPRICE#68, L_DISCOUNT#69, > L_TAX#70, L_RETURNFLAG#71, L_LINESTATUS#72, L_SHIPDATE#73, L_COMMITDATE#74, > L_RECEIPTDATE#75, L_SHIPINSTRUCT#76, L_SHIPMODE#77, L_COMMENT#78], true, > 10000, StorageLevel(disk, memory, 1 replicas) > : : : > +- *(1) Scan JDBCRelation(lineitem) [numPartitions=1] > [L_ORDERKEY#63,L_PARTKEY#64,L_SUPPKEY#65,L_LINENUMBER#66,L_QUANTITY#67,L_EXTENDEDPRICE#68,L_DISCOUNT#69,L_TAX#70,L_RETURNFLAG#71,L_LINESTATUS#72,L_SHIPDATE#73,L_COMMITDATE#74,L_RECEIPTDATE#75,L_SHIPINSTRUCT#76,L_SHIPMODE#77,L_COMMENT#78] > PushedFilters: [], ReadSchema: > struct<L_ORDERKEY:int,L_PARTKEY:int,L_SUPPKEY:int,L_LINENUMBER:int,L_QUANTITY:decimal(15,2),L_EXT... > 18/04/11 10:44:54 INFO BlockManagerMasterEndpoint: Registering block > manager 172.16.50.103:22341 with 15.8 GB RAM, BlockManagerId(2, > 172.16.50.103, 22341, None) > : : +- *(12) Sort [s_suppkey#224 ASC > NULLS FIRST, s_nationkey#227 ASC NULLS FIRST], false, 0 > : : +- Exchange(coordinator id: > 2066777507) hashpartitioning(s_suppkey#224, s_nationkey#227, 200), > coordinator[target post-shuffle partition size: 67108864] > : : +- *(11) Filter > (isnotnull(s_suppkey#224) && isnotnull(s_nationkey#227)) > : : +- InMemoryTableScan > [S_SUPPKEY#224, S_NATIONKEY#227], [isnotnull(s_suppkey#224), > isnotnull(s_nationkey#227)] > : : +- > InMemoryRelation [S_SUPPKEY#224, S_NAME#225, S_ADDRESS#226, > S_NATIONKEY#227, S_PHONE#228, S_ACCTBAL#229, S_COMMENT#230], true, 10000, > StorageLevel(disk, memory, 1 replicas) > : : +- *(1) > Scan JDBCRelation(supplier) [numPartitions=1] > [S_SUPPKEY#224,S_NAME#225,S_ADDRESS#226,S_NATIONKEY#227,S_PHONE#228,S_ACCTBAL#229,S_COMMENT#230] > PushedFilters: [], ReadSchema: > struct<S_SUPPKEY:int,S_NAME:string,S_ADDRESS:string,S_NATIONKEY:int,S_PHONE:string,S_ACCTBAL:deci... > : +- *(16) Sort [n_nationkey#175 ASC NULLS > FIRST], false, 0 > : +- Exchange(coordinator id: 1876548582) > hashpartitioning(n_nationkey#175, 200), coordinator[target post-shuffle > partition size: 67108864] > : +- *(15) Filter > (isnotnull(n_nationkey#175) && isnotnull(n_regionkey#177)) > : +- InMemoryTableScan > [N_NATIONKEY#175, N_NAME#176, N_REGIONKEY#177], > [isnotnull(n_nationkey#175), isnotnull(n_regionkey#177)] > : +- InMemoryRelation > [N_NATIONKEY#175, N_NAME#176, N_REGIONKEY#177, N_COMMENT#178], true, 10000, > StorageLevel(disk, memory, 1 replicas) > : +- *(1) Scan > JDBCRelation(nation) [numPartitions=1] > [N_NATIONKEY#175,N_NAME#176,N_REGIONKEY#177,N_COMMENT#178] PushedFilters: > [], ReadSchema: > struct<N_NATIONKEY:int,N_NAME:string,N_REGIONKEY:int,N_COMMENT:string> > +- *(20) Sort [r_regionkey#203 ASC NULLS FIRST], false, 0 > +- Exchange(coordinator id: 266374831) > hashpartitioning(r_regionkey#203, 200), coordinator[target post-shuffle > partition size: 67108864] > +- *(19) Project [R_REGIONKEY#203] > +- *(19) Filter ((isnotnull(r_name#204) && > (r_name#204 = AFRICA)) && isnotnull(r_regionkey#203)) > +- InMemoryTableScan [R_REGIONKEY#203, > r_name#204], [isnotnull(r_name#204), (r_name#204 = AFRICA), > isnotnull(r_regionkey#203)] > +- InMemoryRelation [R_REGIONKEY#203, > R_NAME#204, R_COMMENT#205], true, 10000, StorageLevel(disk, memory, 1 > replicas) > +- *(1) Scan > JDBCRelation(region) [numPartitions=1] > [R_REGIONKEY#203,R_NAME#204,R_COMMENT#205] PushedFilters: [], ReadSchema: > struct<R_REGIONKEY:int,R_NAME:string,R_COMMENT:string> As you see, all JDBCRelation convert to InMemoryRelation. Cause the JDBC table is so big, the all data can not be filled into memory, so if there is some option to make SparkSQL use Disk if memory not enough?