git commit: [SQL] Improve column pruning.
Repository: spark Updated Branches: refs/heads/branch-1.0 721194bda -> f66f76648 [SQL] Improve column pruning. Fixed a bug that was preventing us from ever pruning beneath Joins. ## TPC-DS Q3 ### Before: ``` Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2] Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150) Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, DoubleType)) AS PartialSum#79] Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43] HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight Exchange (HashPartitioning [ss_item_sk#36:30], 150) HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight Exchange (HashPartitioning [d_date_sk#6:0], 150) Filter (d_moy#14:8 = 12) HiveTableScan [d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150) HiveTableScan [ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#57:0], 150) Filter (i_manufact_id#70:13 = 436) HiveTableScan [i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78], (MetastoreRelation default, item, None), None ``` ### After ``` Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS brand#161,SUM(PartialSum#239) AS sum_agg#162] Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150) Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, DoubleType)) AS PartialSum#239] Project [d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0] HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight Exchange (HashPartitioning [ss_item_sk#196:2], 150) Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3] HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight Exchange (HashPartitioning [d_date_sk#166:0], 150) Project [d_date_sk#166:0,d_year#172:1] Filter (d_moy#174:2 = 12) HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150) HiveTableScan [ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#217:1], 150) Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2] Filter (i_manufact_id#230:3 = 436) HiveTableScan [i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], (MetastoreRelation default, item, None), None ``` Author: Michael Armbrust Closes #729 from marmbrus/fixPruning and squashes the following commits: 5feeff0 [Michael Armbrust] Improve column pruning. (cherry picked from commit 6ce0884446d3571fd6e9d967a080a59c657543b1) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f66f7664 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f66f7664 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f66f7664 Branch: refs/heads/branch-1.0 Commit: f66f76648d32f2ca274b623db395df8a9c6e7d64 Parents: 721194b Author: Michael Armbrust Authored: Tue May 13 23:27:22 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 23:27:29 2014 -0700 -- .../spark/sql/catalyst/optimizer/Optimizer.scala| 16 +++- 1 file changed, 11 insertions(+), 5 deletions(-) --
git commit: Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition"
Repository: spark Updated Branches: refs/heads/branch-1.0 92b0ec9ac -> 721194bda Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition" This reverts commit 66fe4797a845bb1a2728dcdb2d7371f0e90da867. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/721194bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/721194bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/721194bd Branch: refs/heads/branch-1.0 Commit: 721194bdaa54429e76bb8b527154cdfd9c9d0e37 Parents: 92b0ec9 Author: Patrick Wendell Authored: Tue May 13 23:25:19 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 23:25:19 2014 -0700 -- .../scala/org/apache/spark/Partitioner.scala| 61 .../org/apache/spark/PartitioningSuite.scala| 34 --- 2 files changed, 95 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/721194bd/core/src/main/scala/org/apache/spark/Partitioner.scala -- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 6274796..9155159 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -156,64 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } } - -/** - * A [[org.apache.spark.Partitioner]] that partitions records into specified bounds - * Default value is 1000. Once all partitions have bounds elements, the partitioner - * allocates 1 element per partition so eventually the smaller partitions are at most - * off by 1 key compared to the larger partitions. - */ -class BoundaryPartitioner[K : Ordering : ClassTag, V]( -partitions: Int, -@transient rdd: RDD[_ <: Product2[K,V]], -private val boundary: Int = 1000) - extends Partitioner { - - // this array keeps track of keys assigned to a partition - // counts[0] refers to # of keys in partition 0 and so on - private val counts: Array[Int] = { -new Array[Int](numPartitions) - } - - def numPartitions = math.abs(partitions) - - /* - * Ideally, this should've been calculated based on # partitions and total keys - * But we are not calling count on RDD here to avoid calling an action. - * User has the flexibility of calling count and passing in any appropriate boundary - */ - def keysPerPartition = boundary - - var currPartition = 0 - - /* - * Pick current partition for the key until we hit the bound for keys / partition, - * start allocating to next partition at that time. - * - * NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500 - * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3, - * after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2, - * 1503 goes to P3 and so on. - */ - def getPartition(key: Any): Int = { -val partition = currPartition -counts(partition) = counts(partition) + 1 -/* -* Since we are filling up a partition before moving to next one (this helps in maintaining -* order of keys, in certain cases, it is possible to end up with empty partitions, like -* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely -* empty. - */ -if(counts(currPartition) >= keysPerPartition) - currPartition = (currPartition + 1) % numPartitions -partition - } - - override def equals(other: Any): Boolean = other match { -case r: BoundaryPartitioner[_,_] => - (r.counts.sameElements(counts) && r.boundary == boundary -&& r.currPartition == currPartition) -case _ => - false - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/721194bd/core/src/test/scala/org/apache/spark/PartitioningSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7d40395..7c30626 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(descendingP4 != p4) } - test("BoundaryPartitioner equality") { -// Make an RDD where all the elements are the same so that the partition range bounds -// are deterministically all the same. -val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))
git commit: [SQL] Improve column pruning.
Repository: spark Updated Branches: refs/heads/master 7bb9a521f -> 6ce088444 [SQL] Improve column pruning. Fixed a bug that was preventing us from ever pruning beneath Joins. ## TPC-DS Q3 ### Before: ``` Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2] Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150) Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, DoubleType)) AS PartialSum#79] Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43] HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight Exchange (HashPartitioning [ss_item_sk#36:30], 150) HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight Exchange (HashPartitioning [d_date_sk#6:0], 150) Filter (d_moy#14:8 = 12) HiveTableScan [d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150) HiveTableScan [ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#57:0], 150) Filter (i_manufact_id#70:13 = 436) HiveTableScan [i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78], (MetastoreRelation default, item, None), None ``` ### After ``` Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS brand#161,SUM(PartialSum#239) AS sum_agg#162] Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150) Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, DoubleType)) AS PartialSum#239] Project [d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0] HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight Exchange (HashPartitioning [ss_item_sk#196:2], 150) Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3] HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight Exchange (HashPartitioning [d_date_sk#166:0], 150) Project [d_date_sk#166:0,d_year#172:1] Filter (d_moy#174:2 = 12) HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150) HiveTableScan [ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#217:1], 150) Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2] Filter (i_manufact_id#230:3 = 436) HiveTableScan [i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], (MetastoreRelation default, item, None), None ``` Author: Michael Armbrust Closes #729 from marmbrus/fixPruning and squashes the following commits: 5feeff0 [Michael Armbrust] Improve column pruning. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ce08844 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ce08844 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ce08844 Branch: refs/heads/master Commit: 6ce0884446d3571fd6e9d967a080a59c657543b1 Parents: 7bb9a52 Author: Michael Armbrust Authored: Tue May 13 23:27:22 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 23:27:22 2014 -0700 -- .../spark/sql/catalyst/optimizer/Optimizer.scala| 16 +++- 1 file changed, 11 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6ce08844/sql/catalyst/src
git commit: Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition"
Repository: spark Updated Branches: refs/heads/master c33b8dcbf -> 7bb9a521f Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition" This reverts commit 92cebada09a7e5a00ab48bcb350a9462949c33eb. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bb9a521 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bb9a521 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bb9a521 Branch: refs/heads/master Commit: 7bb9a521f35eb19576c6cc2da3fd385910270e46 Parents: c33b8dc Author: Patrick Wendell Authored: Tue May 13 23:24:51 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 23:24:51 2014 -0700 -- .../scala/org/apache/spark/Partitioner.scala| 61 .../org/apache/spark/PartitioningSuite.scala| 34 --- 2 files changed, 95 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7bb9a521/core/src/main/scala/org/apache/spark/Partitioner.scala -- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 6274796..9155159 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -156,64 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } } - -/** - * A [[org.apache.spark.Partitioner]] that partitions records into specified bounds - * Default value is 1000. Once all partitions have bounds elements, the partitioner - * allocates 1 element per partition so eventually the smaller partitions are at most - * off by 1 key compared to the larger partitions. - */ -class BoundaryPartitioner[K : Ordering : ClassTag, V]( -partitions: Int, -@transient rdd: RDD[_ <: Product2[K,V]], -private val boundary: Int = 1000) - extends Partitioner { - - // this array keeps track of keys assigned to a partition - // counts[0] refers to # of keys in partition 0 and so on - private val counts: Array[Int] = { -new Array[Int](numPartitions) - } - - def numPartitions = math.abs(partitions) - - /* - * Ideally, this should've been calculated based on # partitions and total keys - * But we are not calling count on RDD here to avoid calling an action. - * User has the flexibility of calling count and passing in any appropriate boundary - */ - def keysPerPartition = boundary - - var currPartition = 0 - - /* - * Pick current partition for the key until we hit the bound for keys / partition, - * start allocating to next partition at that time. - * - * NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500 - * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3, - * after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2, - * 1503 goes to P3 and so on. - */ - def getPartition(key: Any): Int = { -val partition = currPartition -counts(partition) = counts(partition) + 1 -/* -* Since we are filling up a partition before moving to next one (this helps in maintaining -* order of keys, in certain cases, it is possible to end up with empty partitions, like -* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely -* empty. - */ -if(counts(currPartition) >= keysPerPartition) - currPartition = (currPartition + 1) % numPartitions -partition - } - - override def equals(other: Any): Boolean = other match { -case r: BoundaryPartitioner[_,_] => - (r.counts.sameElements(counts) && r.boundary == boundary -&& r.currPartition == currPartition) -case _ => - false - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/7bb9a521/core/src/test/scala/org/apache/spark/PartitioningSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7d40395..7c30626 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(descendingP4 != p4) } - test("BoundaryPartitioner equality") { -// Make an RDD where all the elements are the same so that the partition range bounds -// are deterministically all the same. -val rdd = sc.parallelize(1.to(4000)).map(x => (x, x)) - -v
git commit: SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo
Repository: spark Updated Branches: refs/heads/branch-1.0 f6323eb3b -> 5c8e8de99 SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo This was used in the past to have a cache of deserialized ShuffleMapTasks, but that's been removed, so there's no need for a lock. It slows down Spark when task descriptions are large, e.g. due to large lineage graphs or local variables. Author: Sandeep Closes #707 from techaddict/SPARK-1775 and squashes the following commits: 18d8ebf [Sandeep] SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo This was used in the past to have a cache of deserialized ShuffleMapTasks, but that's been removed, so there's no need for a lock. It slows down Spark when task descriptions are large, e.g. due to large lineage graphs or local variables. (cherry picked from commit 7db47c463fefc244e9c100d4aab90451c3828261) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c8e8de9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c8e8de9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c8e8de9 Branch: refs/heads/branch-1.0 Commit: 5c8e8de99ffa5aadc1a130c9a3cbeb3c4936eb71 Parents: f6323eb Author: Sandeep Authored: Thu May 8 22:30:17 2014 -0700 Committer: Patrick Wendell Committed: Thu May 8 22:30:58 2014 -0700 -- .../org/apache/spark/scheduler/ShuffleMapTask.scala | 16 +++- 1 file changed, 7 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c8e8de9/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 4b0324f..9ba586f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -57,15 +57,13 @@ private[spark] object ShuffleMapTask { } def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = { -synchronized { - val loader = Thread.currentThread.getContextClassLoader - val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) - val ser = SparkEnv.get.closureSerializer.newInstance() - val objIn = ser.deserializeStream(in) - val rdd = objIn.readObject().asInstanceOf[RDD[_]] - val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] - (rdd, dep) -} +val loader = Thread.currentThread.getContextClassLoader +val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) +val ser = SparkEnv.get.closureSerializer.newInstance() +val objIn = ser.deserializeStream(in) +val rdd = objIn.readObject().asInstanceOf[RDD[_]] +val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]] +(rdd, dep) } // Since both the JarSet and FileSet have the same format this is used for both.
git commit: [SQL] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext.
Repository: spark Updated Branches: refs/heads/branch-1.0 ef5e9d70f -> 618b3e6e7 [SQL] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext. Author: Michael Armbrust Closes #761 from marmbrus/existingContext and squashes the following commits: 4651051 [Michael Armbrust] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext. (cherry picked from commit 44233865cf8020741d862d33cc660c88e9315dea) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/618b3e6e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/618b3e6e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/618b3e6e Branch: refs/heads/branch-1.0 Commit: 618b3e6e7d0bb826ed333b803fe0a7214e1b14ad Parents: ef5e9d7 Author: Michael Armbrust Authored: Tue May 13 21:23:51 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 21:24:01 2014 -0700 -- python/pyspark/sql.py | 7 +-- .../scala/org/apache/spark/sql/api/java/JavaSQLContext.scala | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/618b3e6e/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 6789d70..bbe69e7 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -28,7 +28,7 @@ class SQLContext: register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files. """ -def __init__(self, sparkContext): +def __init__(self, sparkContext, sqlContext = None): """ Create a new SQLContext. @@ -58,10 +58,13 @@ class SQLContext: self._jvm = self._sc._jvm self._pythonToJavaMap = self._jvm.PythonRDD.pythonToJavaMap +if sqlContext: +self._scala_SQLContext = sqlContext + @property def _ssql_ctx(self): """ -Accessor for the JVM SparkSQL context. Subclasses can overrite this property to provide +Accessor for the JVM SparkSQL context. Subclasses can override this property to provide their own JVM Contexts. """ if not hasattr(self, '_scala_SQLContext'): http://git-wip-us.apache.org/repos/asf/spark/blob/618b3e6e/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 57facbe..6f7d431 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -33,9 +33,9 @@ import org.apache.spark.util.Utils /** * The entry point for executing Spark SQL queries from a Java program. */ -class JavaSQLContext(sparkContext: JavaSparkContext) { +class JavaSQLContext(val sqlContext: SQLContext) { - val sqlContext = new SQLContext(sparkContext.sc) + def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc)) /** * Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
git commit: [SPARK-1784] Add a new partitioner to allow specifying # of keys per partition
Repository: spark Updated Branches: refs/heads/branch-1.0 618b3e6e7 -> 66fe4797a [SPARK-1784] Add a new partitioner to allow specifying # of keys per partition This change adds a new partitioner which allows users to specify # of keys per partition. Author: Syed Hashmi Closes #721 from syedhashmi/master and squashes the following commits: 4ca94cc [Syed Hashmi] [SPARK-1784] Add a new partitioner (cherry picked from commit 92cebada09a7e5a00ab48bcb350a9462949c33eb) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66fe4797 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66fe4797 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66fe4797 Branch: refs/heads/branch-1.0 Commit: 66fe4797a845bb1a2728dcdb2d7371f0e90da867 Parents: 618b3e6 Author: Syed Hashmi Authored: Tue May 13 21:24:23 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 21:25:01 2014 -0700 -- .../scala/org/apache/spark/Partitioner.scala| 61 .../org/apache/spark/PartitioningSuite.scala| 34 +++ 2 files changed, 95 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/66fe4797/core/src/main/scala/org/apache/spark/Partitioner.scala -- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9155159..6274796 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -156,3 +156,64 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } } + +/** + * A [[org.apache.spark.Partitioner]] that partitions records into specified bounds + * Default value is 1000. Once all partitions have bounds elements, the partitioner + * allocates 1 element per partition so eventually the smaller partitions are at most + * off by 1 key compared to the larger partitions. + */ +class BoundaryPartitioner[K : Ordering : ClassTag, V]( +partitions: Int, +@transient rdd: RDD[_ <: Product2[K,V]], +private val boundary: Int = 1000) + extends Partitioner { + + // this array keeps track of keys assigned to a partition + // counts[0] refers to # of keys in partition 0 and so on + private val counts: Array[Int] = { +new Array[Int](numPartitions) + } + + def numPartitions = math.abs(partitions) + + /* + * Ideally, this should've been calculated based on # partitions and total keys + * But we are not calling count on RDD here to avoid calling an action. + * User has the flexibility of calling count and passing in any appropriate boundary + */ + def keysPerPartition = boundary + + var currPartition = 0 + + /* + * Pick current partition for the key until we hit the bound for keys / partition, + * start allocating to next partition at that time. + * + * NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500 + * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3, + * after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2, + * 1503 goes to P3 and so on. + */ + def getPartition(key: Any): Int = { +val partition = currPartition +counts(partition) = counts(partition) + 1 +/* +* Since we are filling up a partition before moving to next one (this helps in maintaining +* order of keys, in certain cases, it is possible to end up with empty partitions, like +* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely +* empty. + */ +if(counts(currPartition) >= keysPerPartition) + currPartition = (currPartition + 1) % numPartitions +partition + } + + override def equals(other: Any): Boolean = other match { +case r: BoundaryPartitioner[_,_] => + (r.counts.sameElements(counts) && r.boundary == boundary +&& r.currPartition == currPartition) +case _ => + false + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/66fe4797/core/src/test/scala/org/apache/spark/PartitioningSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7c30626..7d40395 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -66,6 +66,40 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(des
git commit: [SQL] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext.
Repository: spark Updated Branches: refs/heads/master 753b04dea -> 44233865c [SQL] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext. Author: Michael Armbrust Closes #761 from marmbrus/existingContext and squashes the following commits: 4651051 [Michael Armbrust] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44233865 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44233865 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44233865 Branch: refs/heads/master Commit: 44233865cf8020741d862d33cc660c88e9315dea Parents: 753b04d Author: Michael Armbrust Authored: Tue May 13 21:23:51 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 21:23:51 2014 -0700 -- python/pyspark/sql.py | 7 +-- .../scala/org/apache/spark/sql/api/java/JavaSQLContext.scala | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/44233865/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 6789d70..bbe69e7 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -28,7 +28,7 @@ class SQLContext: register L{SchemaRDD}s as tables, execute sql over tables, cache tables, and read parquet files. """ -def __init__(self, sparkContext): +def __init__(self, sparkContext, sqlContext = None): """ Create a new SQLContext. @@ -58,10 +58,13 @@ class SQLContext: self._jvm = self._sc._jvm self._pythonToJavaMap = self._jvm.PythonRDD.pythonToJavaMap +if sqlContext: +self._scala_SQLContext = sqlContext + @property def _ssql_ctx(self): """ -Accessor for the JVM SparkSQL context. Subclasses can overrite this property to provide +Accessor for the JVM SparkSQL context. Subclasses can override this property to provide their own JVM Contexts. """ if not hasattr(self, '_scala_SQLContext'): http://git-wip-us.apache.org/repos/asf/spark/blob/44233865/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 57facbe..6f7d431 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -33,9 +33,9 @@ import org.apache.spark.util.Utils /** * The entry point for executing Spark SQL queries from a Java program. */ -class JavaSQLContext(sparkContext: JavaSparkContext) { +class JavaSQLContext(val sqlContext: SQLContext) { - val sqlContext = new SQLContext(sparkContext.sc) + def this(sparkContext: JavaSparkContext) = this(new SQLContext(sparkContext.sc)) /** * Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
git commit: Implement ApproximateCountDistinct for SparkSql
Repository: spark Updated Branches: refs/heads/branch-1.0 66fe4797a -> 92b0ec9ac Implement ApproximateCountDistinct for SparkSql Add the implementation for ApproximateCountDistinct to SparkSql. We use the HyperLogLog algorithm implemented in stream-lib, and do the count in two phases: 1) counting the number of distinct elements in each partitions, and 2) merge the HyperLogLog results from different partitions. A simple serializer and test cases are added as well. Author: larvaboy Closes #737 from larvaboy/master and squashes the following commits: bd8ef3f [larvaboy] Add support of user-provided standard deviation to ApproxCountDistinct. 9ba8360 [larvaboy] Fix alignment and null handling issues. 95b4067 [larvaboy] Add a test case for count distinct and approximate count distinct. f57917d [larvaboy] Add the parser for the approximate count. a2d5d10 [larvaboy] Add ApproximateCountDistinct aggregates and functions. 7ad273a [larvaboy] Add SparkSql serializer for HyperLogLog. 1d9aacf [larvaboy] Fix a minor typo in the toString method of the Count case class. 653542b [larvaboy] Fix a couple of minor typos. (cherry picked from commit c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b0ec9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b0ec9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b0ec9a Branch: refs/heads/branch-1.0 Commit: 92b0ec9ac9b082f8dea185ef9b462c0f3e3966e2 Parents: 66fe479 Author: larvaboy Authored: Tue May 13 21:26:08 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 21:26:21 2014 -0700 -- .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 +- .../apache/spark/sql/catalyst/SqlParser.scala | 7 ++ .../sql/catalyst/expressions/aggregates.scala | 78 +++- .../sql/execution/SparkSqlSerializer.scala | 17 + .../org/apache/spark/sql/SQLQuerySuite.scala| 21 +- 5 files changed, 122 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92b0ec9a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 5efb438..bc6d204 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -217,7 +217,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. Uses the provided + * more accurate counts but increase the memory footprint and vice versa. Uses the provided * Partitioner to partition the output RDD. */ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { @@ -232,7 +232,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. HashPartitions the + * more accurate counts but increase the memory footprint and vice versa. HashPartitions the * output RDD into numPartitions. * */ @@ -244,7 +244,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Return approximate number of distinct values for each key this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in - * more accurate counts but increase the memory footprint and vise versa. The default value of + * more accurate counts but increase the memory footprint and vice versa. The default value of * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism * level. */ http://git-wip-us.apache.org/repos/asf/spark/blob/92b0ec9a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sq
git commit: [SPARK-1784] Add a new partitioner to allow specifying # of keys per partition
Repository: spark Updated Branches: refs/heads/master 44233865c -> 92cebada0 [SPARK-1784] Add a new partitioner to allow specifying # of keys per partition This change adds a new partitioner which allows users to specify # of keys per partition. Author: Syed Hashmi Closes #721 from syedhashmi/master and squashes the following commits: 4ca94cc [Syed Hashmi] [SPARK-1784] Add a new partitioner Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92cebada Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92cebada Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92cebada Branch: refs/heads/master Commit: 92cebada09a7e5a00ab48bcb350a9462949c33eb Parents: 4423386 Author: Syed Hashmi Authored: Tue May 13 21:24:23 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 21:24:23 2014 -0700 -- .../scala/org/apache/spark/Partitioner.scala| 61 .../org/apache/spark/PartitioningSuite.scala| 34 +++ 2 files changed, 95 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92cebada/core/src/main/scala/org/apache/spark/Partitioner.scala -- diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9155159..6274796 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -156,3 +156,64 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } } + +/** + * A [[org.apache.spark.Partitioner]] that partitions records into specified bounds + * Default value is 1000. Once all partitions have bounds elements, the partitioner + * allocates 1 element per partition so eventually the smaller partitions are at most + * off by 1 key compared to the larger partitions. + */ +class BoundaryPartitioner[K : Ordering : ClassTag, V]( +partitions: Int, +@transient rdd: RDD[_ <: Product2[K,V]], +private val boundary: Int = 1000) + extends Partitioner { + + // this array keeps track of keys assigned to a partition + // counts[0] refers to # of keys in partition 0 and so on + private val counts: Array[Int] = { +new Array[Int](numPartitions) + } + + def numPartitions = math.abs(partitions) + + /* + * Ideally, this should've been calculated based on # partitions and total keys + * But we are not calling count on RDD here to avoid calling an action. + * User has the flexibility of calling count and passing in any appropriate boundary + */ + def keysPerPartition = boundary + + var currPartition = 0 + + /* + * Pick current partition for the key until we hit the bound for keys / partition, + * start allocating to next partition at that time. + * + * NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500 + * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3, + * after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2, + * 1503 goes to P3 and so on. + */ + def getPartition(key: Any): Int = { +val partition = currPartition +counts(partition) = counts(partition) + 1 +/* +* Since we are filling up a partition before moving to next one (this helps in maintaining +* order of keys, in certain cases, it is possible to end up with empty partitions, like +* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely +* empty. + */ +if(counts(currPartition) >= keysPerPartition) + currPartition = (currPartition + 1) % numPartitions +partition + } + + override def equals(other: Any): Boolean = other match { +case r: BoundaryPartitioner[_,_] => + (r.counts.sameElements(counts) && r.boundary == boundary +&& r.currPartition == currPartition) +case _ => + false + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/92cebada/core/src/test/scala/org/apache/spark/PartitioningSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7c30626..7d40395 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -66,6 +66,40 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(descendingP4 != p4) } + test("BoundaryPartitioner equality") { +// Make an RDD where all the elemen
git commit: [SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath
Repository: spark Updated Branches: refs/heads/master 5c0dafc2c -> 753b04dea [SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath JIRA issue: [SPARK-1527](https://issues.apache.org/jira/browse/SPARK-1527) getName() only gets the last component of the file path. When deleting test-generated directories, we should pass the generated directory's absolute path to DiskBlockManager. Author: Ye Xianjin This patch had conflicts when merged, resolved by Committer: Patrick Wendell Closes #436 from advancedxy/SPARK-1527 and squashes the following commits: 4678bab [Ye Xianjin] change rootDir*.getname to rootDir*.getAbsolutePath so the temporary directories are deleted when the test is finished. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/753b04de Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/753b04de Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/753b04de Branch: refs/heads/master Commit: 753b04dea4b04ba9d0dd0011f00e9d70367e76fc Parents: 5c0dafc Author: Ye Xianjin Authored: Tue May 13 19:03:51 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 19:03:51 2014 -0700 -- .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/753b04de/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 2167718..aaa7714 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -52,7 +52,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before rootDir0.deleteOnExit() rootDir1 = Files.createTempDir() rootDir1.deleteOnExit() -rootDirs = rootDir0.getName + "," + rootDir1.getName +rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath println("Created root dirs: " + rootDirs) }
git commit: [SPARK-1816] LiveListenerBus dies if a listener throws an exception
Repository: spark Updated Branches: refs/heads/branch-1.0 d6994f4e6 -> 3892ec584 [SPARK-1816] LiveListenerBus dies if a listener throws an exception The solution is to wrap a try / catch / log around the posting of each event to each listener. Author: Andrew Or Closes #759 from andrewor14/listener-die and squashes the following commits: aee5107 [Andrew Or] Merge branch 'master' of github.com:apache/spark into listener-die 370939f [Andrew Or] Remove two layers of indirection 422d278 [Andrew Or] Explicitly throw an exception instead of 1 / 0 0df0e2a [Andrew Or] Try/catch and log exceptions when posting events (cherry picked from commit 5c0dafc2c8734a421206a808b73be67b66264dd7) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3892ec58 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3892ec58 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3892ec58 Branch: refs/heads/branch-1.0 Commit: 3892ec584706a0ee122062ab896a7aca0ff02d93 Parents: d6994f4 Author: Andrew Or Authored: Tue May 13 18:32:32 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 18:32:44 2014 -0700 -- .../spark/scheduler/LiveListenerBus.scala | 36 ++ .../spark/scheduler/SparkListenerBus.scala | 50 +--- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala| 50 ++-- 4 files changed, 109 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3892ec58/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index dec3316..36a6e63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.util.concurrent.{LinkedBlockingQueue, Semaphore} import org.apache.spark.Logging +import org.apache.spark.util.Utils /** * Asynchronously passes SparkListenerEvents to registered SparkListeners. @@ -42,7 +43,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val listenerThread = new Thread("SparkListenerBus") { setDaemon(true) -override def run() { +override def run(): Unit = Utils.logUncaughtExceptions { while (true) { eventLock.acquire() // Atomically remove and process this event @@ -77,11 +78,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() -} else if (!queueFullErrorMessageLogged) { - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + -"This likely means one of the SparkListeners is too slow and cannot keep up with the " + -"rate at which tasks are being started by the scheduler.") - queueFullErrorMessageLogged = true +} else { + logQueueFullErrorMessage() } } @@ -96,14 +94,19 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { if (System.currentTimeMillis > finishTime) { return false } - /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify - * add overhead in the general case. */ + /* Sleep rather than using wait/notify, because this is used only for testing and + * wait/notify add overhead in the general case. */ Thread.sleep(10) } true } /** + * For testing only. Return whether the listener daemon thread is still alive. + */ + def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive } + + /** * Return whether the event queue is empty. * * The use of synchronized here guarantees that all events that once belonged to this queue @@ -111,6 +114,23 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { */ def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty } + /** + * Log an error message to indicate that the event queue is full. Do this only once. + */ + private def logQueueFullErrorMessage(): Unit = { +if (!queueFullErrorMessageLogged) { + if (listenerThread.isAlive) { +logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with" + + "the rate at which tasks are being started by the schedu
git commit: [SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath
Repository: spark Updated Branches: refs/heads/branch-1.0 3892ec584 -> ef5e9d70f [SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath JIRA issue: [SPARK-1527](https://issues.apache.org/jira/browse/SPARK-1527) getName() only gets the last component of the file path. When deleting test-generated directories, we should pass the generated directory's absolute path to DiskBlockManager. Author: Ye Xianjin This patch had conflicts when merged, resolved by Committer: Patrick Wendell Closes #436 from advancedxy/SPARK-1527 and squashes the following commits: 4678bab [Ye Xianjin] change rootDir*.getname to rootDir*.getAbsolutePath so the temporary directories are deleted when the test is finished. (cherry picked from commit 753b04dea4b04ba9d0dd0011f00e9d70367e76fc) Signed-off-by: Patrick Wendell Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef5e9d70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef5e9d70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef5e9d70 Branch: refs/heads/branch-1.0 Commit: ef5e9d70fafe9b819a6351fd041d0466e5c1d42d Parents: 3892ec5 Author: Ye Xianjin Authored: Tue May 13 19:03:51 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 19:04:23 2014 -0700 -- .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef5e9d70/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala index 2167718..aaa7714 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala @@ -52,7 +52,7 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before rootDir0.deleteOnExit() rootDir1 = Files.createTempDir() rootDir1.deleteOnExit() -rootDirs = rootDir0.getName + "," + rootDir1.getName +rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath println("Created root dirs: " + rootDirs) }
git commit: [SPARK-1816] LiveListenerBus dies if a listener throws an exception
Repository: spark Updated Branches: refs/heads/master d1e487473 -> 5c0dafc2c [SPARK-1816] LiveListenerBus dies if a listener throws an exception The solution is to wrap a try / catch / log around the posting of each event to each listener. Author: Andrew Or Closes #759 from andrewor14/listener-die and squashes the following commits: aee5107 [Andrew Or] Merge branch 'master' of github.com:apache/spark into listener-die 370939f [Andrew Or] Remove two layers of indirection 422d278 [Andrew Or] Explicitly throw an exception instead of 1 / 0 0df0e2a [Andrew Or] Try/catch and log exceptions when posting events Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c0dafc2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c0dafc2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c0dafc2 Branch: refs/heads/master Commit: 5c0dafc2c8734a421206a808b73be67b66264dd7 Parents: d1e4874 Author: Andrew Or Authored: Tue May 13 18:32:32 2014 -0700 Committer: Patrick Wendell Committed: Tue May 13 18:32:32 2014 -0700 -- .../spark/scheduler/LiveListenerBus.scala | 36 ++ .../spark/scheduler/SparkListenerBus.scala | 50 +--- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../spark/scheduler/SparkListenerSuite.scala| 50 ++-- 4 files changed, 109 insertions(+), 29 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c0dafc2/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index dec3316..36a6e63 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler import java.util.concurrent.{LinkedBlockingQueue, Semaphore} import org.apache.spark.Logging +import org.apache.spark.util.Utils /** * Asynchronously passes SparkListenerEvents to registered SparkListeners. @@ -42,7 +43,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val listenerThread = new Thread("SparkListenerBus") { setDaemon(true) -override def run() { +override def run(): Unit = Utils.logUncaughtExceptions { while (true) { eventLock.acquire() // Atomically remove and process this event @@ -77,11 +78,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() -} else if (!queueFullErrorMessageLogged) { - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + -"This likely means one of the SparkListeners is too slow and cannot keep up with the " + -"rate at which tasks are being started by the scheduler.") - queueFullErrorMessageLogged = true +} else { + logQueueFullErrorMessage() } } @@ -96,14 +94,19 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { if (System.currentTimeMillis > finishTime) { return false } - /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify - * add overhead in the general case. */ + /* Sleep rather than using wait/notify, because this is used only for testing and + * wait/notify add overhead in the general case. */ Thread.sleep(10) } true } /** + * For testing only. Return whether the listener daemon thread is still alive. + */ + def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive } + + /** * Return whether the event queue is empty. * * The use of synchronized here guarantees that all events that once belonged to this queue @@ -111,6 +114,23 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { */ def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty } + /** + * Log an error message to indicate that the event queue is full. Do this only once. + */ + private def logQueueFullErrorMessage(): Unit = { +if (!queueFullErrorMessageLogged) { + if (listenerThread.isAlive) { +logError("Dropping SparkListenerEvent because no remaining room in event queue. " + + "This likely means one of the SparkListeners is too slow and cannot keep up with" + + "the rate at which tasks are being started by the scheduler.") + } else { +logError("SparkListenerBus thread is dead! This means SparkListenerEvents hav
git commit: SPARK-1791 - SVM implementation does not use threshold parameter
Repository: spark Updated Branches: refs/heads/branch-1.0 d08e9604f -> d6994f4e6 SPARK-1791 - SVM implementation does not use threshold parameter Summary: https://issues.apache.org/jira/browse/SPARK-1791 Simple fix, and backward compatible, since - anyone who set the threshold was getting completely wrong answers. - anyone who did not set the threshold had the default 0.0 value for the threshold anyway. Test Plan: Unit test added that is verified to fail under the old implementation, and pass under the new implementation. Reviewers: CC: Author: Andrew Tulloch Closes #725 from ajtulloch/SPARK-1791-SVM and squashes the following commits: 770f55d [Andrew Tulloch] SPARK-1791 - SVM implementation does not use threshold parameter (cherry picked from commit d1e487473fd509f28daf28dcda856f3c2f1194ec) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6994f4e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6994f4e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6994f4e Branch: refs/heads/branch-1.0 Commit: d6994f4e67c9ab98f7a707fc744939dc0c9107cf Parents: d08e960 Author: Andrew Tulloch Authored: Tue May 13 17:31:27 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 17:31:38 2014 -0700 -- .../apache/spark/mllib/classification/SVM.scala | 2 +- .../spark/mllib/classification/SVMSuite.scala | 37 2 files changed, 38 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d6994f4e/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index e052135..316ecd7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -65,7 +65,7 @@ class SVMModel private[mllib] ( intercept: Double) = { val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept threshold match { - case Some(t) => if (margin < 0) 0.0 else 1.0 + case Some(t) => if (margin < t) 0.0 else 1.0 case None => margin } } http://git-wip-us.apache.org/repos/asf/spark/blob/d6994f4e/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala index 77d6f04..886c71d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -69,6 +69,43 @@ class SVMSuite extends FunSuite with LocalSparkContext { assert(numOffPredictions < input.length / 5) } + test("SVM with threshold") { +val nPoints = 1 + +// NOTE: Intercept should be small for generating equal 0s and 1s +val A = 0.01 +val B = -1.5 +val C = 1.0 + +val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 42) + +val testRDD = sc.parallelize(testData, 2) +testRDD.cache() + +val svm = new SVMWithSGD().setIntercept(true) +svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + +val model = svm.run(testRDD) + +val validationData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 17) +val validationRDD = sc.parallelize(validationData, 2) + +// Test prediction on RDD. + +var predictions = model.predict(validationRDD.map(_.features)).collect() +assert(predictions.count(_ == 0.0) != predictions.length) + +// High threshold makes all the predictions 0.0 +model.setThreshold(1.0) +predictions = model.predict(validationRDD.map(_.features)).collect() +assert(predictions.count(_ == 0.0) == predictions.length) + +// Low threshold makes all the predictions 1.0 +model.setThreshold(-1.0) +predictions = model.predict(validationRDD.map(_.features)).collect() +assert(predictions.count(_ == 1.0) == predictions.length) + } + test("SVM using local random SGD") { val nPoints = 1
git commit: SPARK-1791 - SVM implementation does not use threshold parameter
Repository: spark Updated Branches: refs/heads/master 16ffadcc4 -> d1e487473 SPARK-1791 - SVM implementation does not use threshold parameter Summary: https://issues.apache.org/jira/browse/SPARK-1791 Simple fix, and backward compatible, since - anyone who set the threshold was getting completely wrong answers. - anyone who did not set the threshold had the default 0.0 value for the threshold anyway. Test Plan: Unit test added that is verified to fail under the old implementation, and pass under the new implementation. Reviewers: CC: Author: Andrew Tulloch Closes #725 from ajtulloch/SPARK-1791-SVM and squashes the following commits: 770f55d [Andrew Tulloch] SPARK-1791 - SVM implementation does not use threshold parameter Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1e48747 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1e48747 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1e48747 Branch: refs/heads/master Commit: d1e487473fd509f28daf28dcda856f3c2f1194ec Parents: 16ffadc Author: Andrew Tulloch Authored: Tue May 13 17:31:27 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 17:31:27 2014 -0700 -- .../apache/spark/mllib/classification/SVM.scala | 2 +- .../spark/mllib/classification/SVMSuite.scala | 37 2 files changed, 38 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d1e48747/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala index e052135..316ecd7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala @@ -65,7 +65,7 @@ class SVMModel private[mllib] ( intercept: Double) = { val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept threshold match { - case Some(t) => if (margin < 0) 0.0 else 1.0 + case Some(t) => if (margin < t) 0.0 else 1.0 case None => margin } } http://git-wip-us.apache.org/repos/asf/spark/blob/d1e48747/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala index 77d6f04..886c71d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -69,6 +69,43 @@ class SVMSuite extends FunSuite with LocalSparkContext { assert(numOffPredictions < input.length / 5) } + test("SVM with threshold") { +val nPoints = 1 + +// NOTE: Intercept should be small for generating equal 0s and 1s +val A = 0.01 +val B = -1.5 +val C = 1.0 + +val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 42) + +val testRDD = sc.parallelize(testData, 2) +testRDD.cache() + +val svm = new SVMWithSGD().setIntercept(true) +svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100) + +val model = svm.run(testRDD) + +val validationData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 17) +val validationRDD = sc.parallelize(validationData, 2) + +// Test prediction on RDD. + +var predictions = model.predict(validationRDD.map(_.features)).collect() +assert(predictions.count(_ == 0.0) != predictions.length) + +// High threshold makes all the predictions 0.0 +model.setThreshold(1.0) +predictions = model.predict(validationRDD.map(_.features)).collect() +assert(predictions.count(_ == 0.0) == predictions.length) + +// Low threshold makes all the predictions 1.0 +model.setThreshold(-1.0) +predictions = model.predict(validationRDD.map(_.features)).collect() +assert(predictions.count(_ == 1.0) == predictions.length) + } + test("SVM using local random SGD") { val nPoints = 1
git commit: SPARK-571: forbid return statements in cleaned closures
Repository: spark Updated Branches: refs/heads/master 52d905296 -> 16ffadcc4 SPARK-571: forbid return statements in cleaned closures This patch checks top-level closure arguments to `ClosureCleaner.clean` for `return` statements and raises an exception if it finds any. This is mainly a user-friendliness addition, since programs with return statements in closure arguments will currently fail upon RDD actions with a less-than-intuitive error message. Author: William Benton Closes #717 from willb/spark-571 and squashes the following commits: c41eb7d [William Benton] Another test case for SPARK-571 30c42f4 [William Benton] Stylistic cleanups 559b16b [William Benton] Stylistic cleanups from review de13b79 [William Benton] Style fixes 295b6a5 [William Benton] Forbid return statements in closure arguments. b017c47 [William Benton] Added a test for SPARK-571 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16ffadcc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16ffadcc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16ffadcc Branch: refs/heads/master Commit: 16ffadcc4af21430b5079dc555bcd9d8cf1fa1fa Parents: 52d9052 Author: William Benton Authored: Tue May 13 13:45:23 2014 -0700 Committer: Reynold Xin Committed: Tue May 13 13:45:23 2014 -0700 -- .../org/apache/spark/util/ClosureCleaner.scala | 23 +++- .../apache/spark/util/ClosureCleanerSuite.scala | 39 +++- 2 files changed, 60 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16ffadcc/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 2d05e09..4916d9b 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -25,7 +25,7 @@ import scala.collection.mutable.Set import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type} import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._ -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkException} private[spark] object ClosureCleaner extends Logging { // Get an ASM class reader for a given class from the JAR that loaded it @@ -108,6 +108,9 @@ private[spark] object ClosureCleaner extends Logging { val outerObjects = getOuterObjects(func) val accessedFields = Map[Class[_], Set[String]]() + +getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0) + for (cls <- outerClasses) accessedFields(cls) = Set[String]() for (cls <- func.getClass :: innerClasses) @@ -181,6 +184,24 @@ private[spark] object ClosureCleaner extends Logging { } private[spark] +class ReturnStatementFinder extends ClassVisitor(ASM4) { + override def visitMethod(access: Int, name: String, desc: String, + sig: String, exceptions: Array[String]): MethodVisitor = { +if (name.contains("apply")) { + new MethodVisitor(ASM4) { +override def visitTypeInsn(op: Int, tp: String) { + if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) { +throw new SparkException("Return statements aren't allowed in Spark closures") + } +} + } +} else { + new MethodVisitor(ASM4) {} +} + } +} + +private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) { override def visitMethod(access: Int, name: String, desc: String, sig: String, exceptions: Array[String]): MethodVisitor = { http://git-wip-us.apache.org/repos/asf/spark/blob/16ffadcc/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index d7e48e6..054ef54 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import org.scalatest.FunSuite import org.apache.spark.LocalSparkContext._ -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} class ClosureCleanerSuite extends FunSuite { test("closures inside an object") { @@ -50,6 +50,19 @@ class ClosureCleanerSuite extends FunSuite { val obj = new TestClassWithNesting(1) assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 *
git commit: Rollback versions for 1.0.0-rc4
Repository: spark Updated Branches: refs/heads/branch-1.0 d78e37ab5 -> 51142b773 Rollback versions for 1.0.0-rc4 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51142b77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51142b77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51142b77 Branch: refs/heads/branch-1.0 Commit: 51142b7732209250dd41e0a4182d02df81d7cc8b Parents: d78e37a Author: Patrick Wendell Authored: Mon May 12 15:23:53 2014 -0700 Committer: Patrick Wendell Committed: Mon May 12 15:23:53 2014 -0700 -- assembly/pom.xml | 2 +- bagel/pom.xml | 2 +- core/pom.xml | 2 +- examples/pom.xml | 2 +- external/flume/pom.xml| 2 +- external/kafka/pom.xml| 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml | 2 +- external/zeromq/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml| 2 +- mllib/pom.xml | 2 +- pom.xml | 2 +- repl/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- yarn/pom.xml | 2 +- yarn/stable/pom.xml | 2 +- 21 files changed, 21 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 6e29704..6c4d46a 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index c8ad40f..355f437 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 458e14c..bab50f5 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 9156a11..874bcd7 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/flume/pom.xml -- diff --git a/external/flume/pom.xml b/external/flume/pom.xml index 1cefa15..6aec215 100644 --- a/external/flume/pom.xml +++ b/external/flume/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/kafka/pom.xml -- diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml index cc05e69..979eb0c 100644 --- a/external/kafka/pom.xml +++ b/external/kafka/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/mqtt/pom.xml -- diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml index ab5afa2..7b2dc5b 100644 --- a/external/mqtt/pom.xml +++ b/external/mqtt/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/twitter/pom.xml -- diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml index 7abbd4b..5766d3a 100644 --- a/external/twitter/pom.xml +++ b/external/twitter/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent -1.0.2-SNAPSHOT +1.0.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/re
[3/3] git commit: BUILD: Add more content to make-distribution.sh.
BUILD: Add more content to make-distribution.sh. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/716462c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/716462c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/716462c6 Branch: refs/heads/branch-1.0 Commit: 716462c6c265325f2b38ff518ba1610fdc3a139b Parents: 9b8b737 Author: Patrick Wendell Authored: Mon May 12 23:02:54 2014 -0700 Committer: Patrick Wendell Committed: Mon May 12 23:24:40 2014 -0700 -- make-distribution.sh | 13 + 1 file changed, 13 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/716462c6/make-distribution.sh -- diff --git a/make-distribution.sh b/make-distribution.sh index 1cc2844..7a08d6b 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -171,10 +171,22 @@ echo "Spark $VERSION built for Hadoop $SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/" cp $FWDIR/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/" +# Copy example sources (needed for python and SQL) +mkdir -p "$DISTDIR/examples/src/main" +cp -r $FWDIR/examples/src/main "$DISTDIR/examples/src/" + if [ "$SPARK_HIVE" == "true" ]; then cp $FWDIR/lib_managed/jars/datanucleus*.jar "$DISTDIR/lib/" fi +# Copy license and ASF files +cp "$FWDIR/LICENSE" "$DISTDIR" +cp "$FWDIR/NOTICE" "$DISTDIR" + +if [ -e $FWDIR/CHANGES.txt ]; then + cp "$FWDIR/CHANGES.txt" "$DISTDIR" +fi + # Copy other things mkdir "$DISTDIR"/conf cp "$FWDIR"/conf/*.template "$DISTDIR"/conf @@ -182,6 +194,7 @@ cp "$FWDIR"/conf/slaves "$DISTDIR"/conf cp -r "$FWDIR/bin" "$DISTDIR" cp -r "$FWDIR/python" "$DISTDIR" cp -r "$FWDIR/sbin" "$DISTDIR" +cp -r "$FWDIR/ec2" "$DISTDIR" # Download and copy in tachyon, if requested if [ "$SPARK_TACHYON" == "true" ]; then
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.0.0-rc2 [deleted] 327ab1e24
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.0.0-rc1 [deleted] c8d0eb980
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.0.0-rc4 [deleted] 4f6ab6c68
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.0.0-rc3 [deleted] e7c46933a
git commit: [SPARK-1780] Non-existent SPARK_DAEMON_OPTS is lurking around
Repository: spark Updated Branches: refs/heads/master 156df87e7 -> ba96bb3d5 [SPARK-1780] Non-existent SPARK_DAEMON_OPTS is lurking around What they really mean is SPARK_DAEMON_***JAVA***_OPTS Author: Andrew Or Closes #751 from andrewor14/spark-daemon-opts and squashes the following commits: 70c41f9 [Andrew Or] SPARK_DAEMON_OPTS -> SPARK_DAEMON_JAVA_OPTS Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba96bb3d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba96bb3d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba96bb3d Branch: refs/heads/master Commit: ba96bb3d591130075763706526f86fb2aaffa3ae Parents: 156df87 Author: Andrew Or Authored: Mon May 12 19:42:35 2014 -0700 Committer: Patrick Wendell Committed: Mon May 12 19:42:35 2014 -0700 -- conf/spark-env.sh.template | 2 +- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba96bb3d/conf/spark-env.sh.template -- diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index f906be6..4479e1e 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -39,5 +39,5 @@ # - SPARK_WORKER_DIR, to set the working directory of worker processes # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y") # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y") -# - SPARK_DAEMON_OPTS, to set config properties for all daemons (e.g. "-Dx=y") +# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y") # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers http://git-wip-us.apache.org/repos/asf/spark/blob/ba96bb3d/core/src/main/scala/org/apache/spark/SparkConf.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index bd21fdc..8006166 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -247,7 +247,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { | - ./spark-submit with conf/spark-defaults.conf to set defaults for an application | - ./spark-submit with --driver-java-options to set -X options for a driver | - spark.executor.extraJavaOptions to set -X options for executors - | - SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master, worker) + | - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker) """.stripMargin logError(error)