I don’t see how this explains the time differences. Dudu
From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Tuesday, July 12, 2016 10:56 AM To: user <user@hive.apache.org> Cc: user @spark <u...@spark.apache.org> Subject: Re: Using Spark on Hive with Hive also using Spark as its execution engine This the whole idea. Spark uses DAG + IM, MR is classic This is for Hive on Spark hive> explain select max(id) from dummy_parquet; OK STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Spark Edges: Reducer 2 <- Map 1 (GROUP, 1) DagName: hduser_20160712083219_632c2749-7387-478f-972d-9eaadd9932c6:1 Vertices: Map 1 Map Operator Tree: TableScan alias: dummy_parquet Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: id (type: int) outputColumnNames: id Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: max(id) mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int) Reducer 2 Reduce Operator Tree: Group By Operator aggregations: max(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink Time taken: 2.801 seconds, Fetched: 50 row(s) And this is with setting the execution engine to MR hive> set hive.execution.engine=mr; Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. hive> explain select max(id) from dummy_parquet; OK STAGE DEPENDENCIES: Stage-1 is a root stage Stage-0 depends on stages: Stage-1 STAGE PLANS: Stage: Stage-1 Map Reduce Map Operator Tree: TableScan alias: dummy_parquet Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: id (type: int) outputColumnNames: id Statistics: Num rows: 100000000 Data size: 700000000 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: max(id) mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order: Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: int) Reduce Operator Tree: Group By Operator aggregations: max(VALUE._col0) mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.TextInputFormat output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe Stage: Stage-0 Fetch Operator limit: -1 Processor Tree: ListSink Time taken: 0.1 seconds, Fetched: 44 row(s) HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 12 July 2016 at 08:16, Markovitz, Dudu <dmarkov...@paypal.com<mailto:dmarkov...@paypal.com>> wrote: This is a simple task – Read the files, find the local max value and combine the results (find the global max value). How do you explain the differences in the results? Spark reads the files and finds a local max 10X (+) faster than MR? Can you please attach the execution plan? Thanks Dudu From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>] Sent: Monday, July 11, 2016 11:55 PM To: user <user@hive.apache.org<mailto:user@hive.apache.org>>; user @spark <u...@spark.apache.org<mailto:u...@spark.apache.org>> Subject: Re: Using Spark on Hive with Hive also using Spark as its execution engine In my test I did like for like keeping the systematic the same namely: 1. Table was a parquet table of 100 Million rows 2. The same set up was used for both Hive on Spark and Hive on MR 3. Spark was very impressive compared to MR on this particular test. Just to see any issues I created an ORC table in in the image of Parquet (insert/select from Parquet to ORC) with stats updated for columns etc These were the results of the same run using ORC table this time: hive> select max(id) from oraclehadoop.dummy; Starting Spark Job = b886b869-5500-4ef7-aab9-ae6fb4dad22b Query Hive on Spark job[1] stages: 2 3 Status: Running (Hive on Spark job[1]) Job Progress Format CurrentTime StageId_StageAttemptId: SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost] 2016-07-11 21:35:45,020 Stage-2_0: 0(+8)/23 Stage-3_0: 0/1 2016-07-11 21:35:48,033 Stage-2_0: 0(+8)/23 Stage-3_0: 0/1 2016-07-11 21:35:51,046 Stage-2_0: 1(+8)/23 Stage-3_0: 0/1 2016-07-11 21:35:52,050 Stage-2_0: 3(+8)/23 Stage-3_0: 0/1 2016-07-11 21:35:53,055 Stage-2_0: 8(+4)/23 Stage-3_0: 0/1 2016-07-11 21:35:54,060 Stage-2_0: 11(+1)/23 Stage-3_0: 0/1 2016-07-11 21:35:55,065 Stage-2_0: 12(+0)/23 Stage-3_0: 0/1 2016-07-11 21:35:56,071 Stage-2_0: 12(+8)/23 Stage-3_0: 0/1 2016-07-11 21:35:57,076 Stage-2_0: 13(+8)/23 Stage-3_0: 0/1 2016-07-11 21:35:58,081 Stage-2_0: 20(+3)/23 Stage-3_0: 0/1 2016-07-11 21:35:59,085 Stage-2_0: 23/23 Finished Stage-3_0: 0(+1)/1 2016-07-11 21:36:00,089 Stage-2_0: 23/23 Finished Stage-3_0: 1/1 Finished Status: Finished successfully in 16.08 seconds OK 100000000 Time taken: 17.775 seconds, Fetched: 1 row(s) Repeat with MR engine hive> set hive.execution.engine=mr; Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. hive> select max(id) from oraclehadoop.dummy; WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases. Query ID = hduser_20160711213100_8dc2afae-8644-4097-ba33-c7bd3c304bf8 Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1468226887011_0008, Tracking URL = http://rhes564:8088/proxy/application_1468226887011_0008/ Kill Command = /home/hduser/hadoop-2.6.0/bin/hadoop job -kill job_1468226887011_0008 Hadoop job information for Stage-1: number of mappers: 23; number of reducers: 1 2016-07-11 21:37:00,061 Stage-1 map = 0%, reduce = 0% 2016-07-11 21:37:06,440 Stage-1 map = 4%, reduce = 0%, Cumulative CPU 16.48 sec 2016-07-11 21:37:14,751 Stage-1 map = 9%, reduce = 0%, Cumulative CPU 40.63 sec 2016-07-11 21:37:22,048 Stage-1 map = 13%, reduce = 0%, Cumulative CPU 58.88 sec 2016-07-11 21:37:30,412 Stage-1 map = 17%, reduce = 0%, Cumulative CPU 80.72 sec 2016-07-11 21:37:37,707 Stage-1 map = 22%, reduce = 0%, Cumulative CPU 103.43 sec 2016-07-11 21:37:45,999 Stage-1 map = 26%, reduce = 0%, Cumulative CPU 125.93 sec 2016-07-11 21:37:54,300 Stage-1 map = 30%, reduce = 0%, Cumulative CPU 147.17 sec 2016-07-11 21:38:01,538 Stage-1 map = 35%, reduce = 0%, Cumulative CPU 166.56 sec 2016-07-11 21:38:08,807 Stage-1 map = 39%, reduce = 0%, Cumulative CPU 189.29 sec 2016-07-11 21:38:17,115 Stage-1 map = 43%, reduce = 0%, Cumulative CPU 211.03 sec 2016-07-11 21:38:24,363 Stage-1 map = 48%, reduce = 0%, Cumulative CPU 235.68 sec 2016-07-11 21:38:32,638 Stage-1 map = 52%, reduce = 0%, Cumulative CPU 258.27 sec 2016-07-11 21:38:40,916 Stage-1 map = 57%, reduce = 0%, Cumulative CPU 278.44 sec 2016-07-11 21:38:49,206 Stage-1 map = 61%, reduce = 0%, Cumulative CPU 300.35 sec 2016-07-11 21:38:58,524 Stage-1 map = 65%, reduce = 0%, Cumulative CPU 322.89 sec 2016-07-11 21:39:07,889 Stage-1 map = 70%, reduce = 0%, Cumulative CPU 344.8 sec 2016-07-11 21:39:16,151 Stage-1 map = 74%, reduce = 0%, Cumulative CPU 367.77 sec 2016-07-11 21:39:25,456 Stage-1 map = 78%, reduce = 0%, Cumulative CPU 391.82 sec 2016-07-11 21:39:33,725 Stage-1 map = 83%, reduce = 0%, Cumulative CPU 415.48 sec 2016-07-11 21:39:43,037 Stage-1 map = 87%, reduce = 0%, Cumulative CPU 436.09 sec 2016-07-11 21:39:51,292 Stage-1 map = 91%, reduce = 0%, Cumulative CPU 459.4 sec 2016-07-11 21:39:59,563 Stage-1 map = 96%, reduce = 0%, Cumulative CPU 477.92 sec 2016-07-11 21:40:05,760 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 491.72 sec 2016-07-11 21:40:10,921 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 499.37 sec MapReduce Total cumulative CPU time: 8 minutes 19 seconds 370 msec Ended Job = job_1468226887011_0008 MapReduce Jobs Launched: Stage-Stage-1: Map: 23 Reduce: 1 Cumulative CPU: 499.37 sec HDFS Read: 403754774 HDFS Write: 10 SUCCESS Total MapReduce CPU Time Spent: 8 minutes 19 seconds 370 msec OK 100000000 Time taken: 202.333 seconds, Fetched: 1 row(s) So in summary Table MR/sec Spark/sec Parquet 239.532 14.38 ORC 202.333 17.77 Still I would use Spark if I had a choice and I agree that on VLT (very large tables), the limitation in available memory may be the overriding factor in using Spark. HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 11 July 2016 at 19:25, Gopal Vijayaraghavan <gop...@apache.org<mailto:gop...@apache.org>> wrote: > Status: Finished successfully in 14.12 seconds > OK > 100000000 > Time taken: 14.38 seconds, Fetched: 1 row(s) That might be an improvement over MR, but that still feels far too slow. Parquet numbers are in general bad in Hive, but that's because the Parquet reader gets no actual love from the devs. The community, if it wants to keep using Parquet heavily needs a Hive dev to go over to Parquet-mr and cut a significant number of memory copies out of the reader. The Spark 2.0 build for instance, has a custom Parquet reader for SparkSQL which does this. SPARK-12854 does for Spark+Parquet what Hive 2.0 does for ORC (actually, it looks more like hive's VectorizedRowBatch than Tungsten's flat layouts). But that reader cannot be used in Hive-on-Spark, because it is not a public reader impl. Not to pick an arbitrary dataset, my workhorse example is a TPC-H lineitem at 10Gb scale with a single 16 box. hive(tpch_flat_orc_10)> select max(l_discount) from lineitem; Query ID = gopal_20160711175917_f96371aa-2721-49c8-99a0-f7c4a1eacfda Total jobs = 1 Launching Job 1 out of 1 Status: Running (Executing on YARN cluster with App id application_1466700718395_0256) --------------------------------------------------------------------------- ------------------- VERTICES MODE STATUS TOTAL COMPLETED RUNNING PENDING FAILED KILLED --------------------------------------------------------------------------- ------------------- Map 1 .......... llap SUCCEEDED 13 13 0 0 0 0 Reducer 2 ...... llap SUCCEEDED 1 1 0 0 0 0 --------------------------------------------------------------------------- ------------------- VERTICES: 02/02 [==========================>>] 100% ELAPSED TIME: 0.71 s --------------------------------------------------------------------------- ------------------- Status: DAG finished successfully in 0.71 seconds Query Execution Summary --------------------------------------------------------------------------- ------------------- OPERATION DURATION --------------------------------------------------------------------------- ------------------- Compile Query 0.21s Prepare Plan 0.13s Submit Plan 0.34s Start DAG 0.23s Run DAG 0.71s --------------------------------------------------------------------------- ------------------- Task Execution Summary --------------------------------------------------------------------------- ------------------- VERTICES DURATION(ms) CPU_TIME(ms) GC_TIME(ms) INPUT_RECORDS OUTPUT_RECORDS --------------------------------------------------------------------------- ------------------- Map 1 604.00 0 0 59,957,438 13 Reducer 2 105.00 0 0 13 0 --------------------------------------------------------------------------- ------------------- LLAP IO Summary --------------------------------------------------------------------------- ------------------- VERTICES ROWGROUPS META_HIT META_MISS DATA_HIT DATA_MISS ALLOCATION USED TOTAL_IO --------------------------------------------------------------------------- ------------------- Map 1 6036 0 146 0B 68.86MB 491.00MB 479.89MB 7.94s --------------------------------------------------------------------------- ------------------- OK 0.1 Time taken: 1.669 seconds, Fetched: 1 row(s) hive(tpch_flat_orc_10)> This is running against a single 16 core box & I would assume it would take <1.4s to read twice as much (13 tasks is barely touching the load factors). It would probably be a bit faster if the cache had hits, but in general 14s to read a 100M rows is nearly a magnitude off where Hive 2.2.0 is. Cheers, Gopal