git commit: [SQL] Improve column pruning.

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 721194bda -> f66f76648


[SQL] Improve column pruning.

Fixed a bug that was preventing us from ever pruning beneath Joins.

## TPC-DS Q3
### Before:
```
Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 
AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2]
 Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150)
  Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], 
[d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, 
DoubleType)) AS PartialSum#79]
   Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43]
HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight
 Exchange (HashPartitioning [ss_item_sk#36:30], 150)
  HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight
   Exchange (HashPartitioning [d_date_sk#6:0], 150)
Filter (d_moy#14:8 = 12)
 HiveTableScan 
[d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33],
 (MetastoreRelation default, date_dim, Some(dt)), None
   Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150)
HiveTableScan 
[ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56],
 (MetastoreRelation default, store_sales, None), None
 Exchange (HashPartitioning [i_item_sk#57:0], 150)
  Filter (i_manufact_id#70:13 = 436)
   HiveTableScan 
[i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78],
 (MetastoreRelation default, item, None), None
```
### After
```
Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], 
[d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS 
brand#161,SUM(PartialSum#239) AS sum_agg#162]
 Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150)
  Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], 
[d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, 
DoubleType)) AS PartialSum#239]
   Project 
[d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0]
HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight
 Exchange (HashPartitioning [ss_item_sk#196:2], 150)
  Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3]
   HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight
Exchange (HashPartitioning [d_date_sk#166:0], 150)
 Project [d_date_sk#166:0,d_year#172:1]
  Filter (d_moy#174:2 = 12)
   HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], 
(MetastoreRelation default, date_dim, Some(dt)), None
Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150)
 HiveTableScan 
[ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation 
default, store_sales, None), None
 Exchange (HashPartitioning [i_item_sk#217:1], 150)
  Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2]
   Filter (i_manufact_id#230:3 = 436)
HiveTableScan 
[i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], 
(MetastoreRelation default, item, None), None
```

Author: Michael Armbrust 

Closes #729 from marmbrus/fixPruning and squashes the following commits:

5feeff0 [Michael Armbrust] Improve column pruning.
(cherry picked from commit 6ce0884446d3571fd6e9d967a080a59c657543b1)

Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f66f7664
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f66f7664
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f66f7664

Branch: refs/heads/branch-1.0
Commit: f66f76648d32f2ca274b623db395df8a9c6e7d64
Parents: 721194b
Author: Michael Armbrust 
Authored: Tue May 13 23:27:22 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 23:27:29 2014 -0700

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala| 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--

git commit: Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition"

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 92b0ec9ac -> 721194bda


Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per 
partition"

This reverts commit 66fe4797a845bb1a2728dcdb2d7371f0e90da867.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/721194bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/721194bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/721194bd

Branch: refs/heads/branch-1.0
Commit: 721194bdaa54429e76bb8b527154cdfd9c9d0e37
Parents: 92b0ec9
Author: Patrick Wendell 
Authored: Tue May 13 23:25:19 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 23:25:19 2014 -0700

--
 .../scala/org/apache/spark/Partitioner.scala| 61 
 .../org/apache/spark/PartitioningSuite.scala| 34 ---
 2 files changed, 95 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/721194bd/core/src/main/scala/org/apache/spark/Partitioner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 6274796..9155159 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -156,64 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   false
   }
 }
-
-/**
- * A [[org.apache.spark.Partitioner]] that partitions records into specified 
bounds
- * Default value is 1000. Once all partitions have bounds elements, the 
partitioner
- * allocates 1 element per partition so eventually the smaller partitions are 
at most
- * off by 1 key compared to the larger partitions.
- */
-class BoundaryPartitioner[K : Ordering : ClassTag, V](
-partitions: Int,
-@transient rdd: RDD[_ <: 
Product2[K,V]],
-private val boundary: Int 
= 1000)
-  extends Partitioner {
-
-  // this array keeps track of keys assigned to a partition
-  // counts[0] refers to # of keys in partition 0 and so on
-  private val counts: Array[Int] = {
-new Array[Int](numPartitions)
-  }
-
-  def numPartitions = math.abs(partitions)
-
-  /*
-  * Ideally, this should've been calculated based on # partitions and total 
keys
-  * But we are not calling count on RDD here to avoid calling an action.
-   * User has the flexibility of calling count and passing in any appropriate 
boundary
-   */
-  def keysPerPartition = boundary
-
-  var currPartition = 0
-
-  /*
-  * Pick current partition for the key until we hit the bound for keys / 
partition,
-  * start allocating to next partition at that time.
-  *
-  * NOTE: In case where we have lets say 2000 keys and user says 3 partitions 
with 500
-  * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 
1001-1500 go to P3,
-  * after that, next keys go to one partition at a time. So 1501 goes to P1, 
1502 goes to P2,
-  * 1503 goes to P3 and so on.
-   */
-  def getPartition(key: Any): Int = {
-val partition = currPartition
-counts(partition) = counts(partition) + 1
-/*
-* Since we are filling up a partition before moving to next one (this 
helps in maintaining
-* order of keys, in certain cases, it is possible to end up with empty 
partitions, like
-* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition 
will be entirely
-* empty.
- */
-if(counts(currPartition) >= keysPerPartition)
-  currPartition = (currPartition + 1) % numPartitions
-partition
-  }
-
-  override def equals(other: Any): Boolean = other match {
-case r: BoundaryPartitioner[_,_] =>
-  (r.counts.sameElements(counts) && r.boundary == boundary
-&& r.currPartition == currPartition)
-case _ =>
-  false
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/721194bd/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7d40395..7c30626 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with 
SharedSparkContext with PrivateMet
 assert(descendingP4 != p4)
   }
 
-  test("BoundaryPartitioner equality") {
-// Make an RDD where all the elements are the same so that the partition 
range bounds
-// are deterministically all the same.
-val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))

git commit: [SQL] Improve column pruning.

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 7bb9a521f -> 6ce088444


[SQL] Improve column pruning.

Fixed a bug that was preventing us from ever pruning beneath Joins.

## TPC-DS Q3
### Before:
```
Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 
AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2]
 Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150)
  Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], 
[d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, 
DoubleType)) AS PartialSum#79]
   Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43]
HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight
 Exchange (HashPartitioning [ss_item_sk#36:30], 150)
  HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight
   Exchange (HashPartitioning [d_date_sk#6:0], 150)
Filter (d_moy#14:8 = 12)
 HiveTableScan 
[d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33],
 (MetastoreRelation default, date_dim, Some(dt)), None
   Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150)
HiveTableScan 
[ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56],
 (MetastoreRelation default, store_sales, None), None
 Exchange (HashPartitioning [i_item_sk#57:0], 150)
  Filter (i_manufact_id#70:13 = 436)
   HiveTableScan 
[i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78],
 (MetastoreRelation default, item, None), None
```
### After
```
Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], 
[d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS 
brand#161,SUM(PartialSum#239) AS sum_agg#162]
 Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150)
  Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], 
[d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, 
DoubleType)) AS PartialSum#239]
   Project 
[d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0]
HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight
 Exchange (HashPartitioning [ss_item_sk#196:2], 150)
  Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3]
   HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight
Exchange (HashPartitioning [d_date_sk#166:0], 150)
 Project [d_date_sk#166:0,d_year#172:1]
  Filter (d_moy#174:2 = 12)
   HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], 
(MetastoreRelation default, date_dim, Some(dt)), None
Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150)
 HiveTableScan 
[ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation 
default, store_sales, None), None
 Exchange (HashPartitioning [i_item_sk#217:1], 150)
  Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2]
   Filter (i_manufact_id#230:3 = 436)
HiveTableScan 
[i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], 
(MetastoreRelation default, item, None), None
```

Author: Michael Armbrust 

Closes #729 from marmbrus/fixPruning and squashes the following commits:

5feeff0 [Michael Armbrust] Improve column pruning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ce08844
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ce08844
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ce08844

Branch: refs/heads/master
Commit: 6ce0884446d3571fd6e9d967a080a59c657543b1
Parents: 7bb9a52
Author: Michael Armbrust 
Authored: Tue May 13 23:27:22 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 23:27:22 2014 -0700

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala| 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6ce08844/sql/catalyst/src

git commit: Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition"

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master c33b8dcbf -> 7bb9a521f


Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per 
partition"

This reverts commit 92cebada09a7e5a00ab48bcb350a9462949c33eb.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bb9a521
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bb9a521
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bb9a521

Branch: refs/heads/master
Commit: 7bb9a521f35eb19576c6cc2da3fd385910270e46
Parents: c33b8dc
Author: Patrick Wendell 
Authored: Tue May 13 23:24:51 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 23:24:51 2014 -0700

--
 .../scala/org/apache/spark/Partitioner.scala| 61 
 .../org/apache/spark/PartitioningSuite.scala| 34 ---
 2 files changed, 95 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7bb9a521/core/src/main/scala/org/apache/spark/Partitioner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 6274796..9155159 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -156,64 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   false
   }
 }
-
-/**
- * A [[org.apache.spark.Partitioner]] that partitions records into specified 
bounds
- * Default value is 1000. Once all partitions have bounds elements, the 
partitioner
- * allocates 1 element per partition so eventually the smaller partitions are 
at most
- * off by 1 key compared to the larger partitions.
- */
-class BoundaryPartitioner[K : Ordering : ClassTag, V](
-partitions: Int,
-@transient rdd: RDD[_ <: 
Product2[K,V]],
-private val boundary: Int 
= 1000)
-  extends Partitioner {
-
-  // this array keeps track of keys assigned to a partition
-  // counts[0] refers to # of keys in partition 0 and so on
-  private val counts: Array[Int] = {
-new Array[Int](numPartitions)
-  }
-
-  def numPartitions = math.abs(partitions)
-
-  /*
-  * Ideally, this should've been calculated based on # partitions and total 
keys
-  * But we are not calling count on RDD here to avoid calling an action.
-   * User has the flexibility of calling count and passing in any appropriate 
boundary
-   */
-  def keysPerPartition = boundary
-
-  var currPartition = 0
-
-  /*
-  * Pick current partition for the key until we hit the bound for keys / 
partition,
-  * start allocating to next partition at that time.
-  *
-  * NOTE: In case where we have lets say 2000 keys and user says 3 partitions 
with 500
-  * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 
1001-1500 go to P3,
-  * after that, next keys go to one partition at a time. So 1501 goes to P1, 
1502 goes to P2,
-  * 1503 goes to P3 and so on.
-   */
-  def getPartition(key: Any): Int = {
-val partition = currPartition
-counts(partition) = counts(partition) + 1
-/*
-* Since we are filling up a partition before moving to next one (this 
helps in maintaining
-* order of keys, in certain cases, it is possible to end up with empty 
partitions, like
-* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition 
will be entirely
-* empty.
- */
-if(counts(currPartition) >= keysPerPartition)
-  currPartition = (currPartition + 1) % numPartitions
-partition
-  }
-
-  override def equals(other: Any): Boolean = other match {
-case r: BoundaryPartitioner[_,_] =>
-  (r.counts.sameElements(counts) && r.boundary == boundary
-&& r.currPartition == currPartition)
-case _ =>
-  false
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7bb9a521/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7d40395..7c30626 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with 
SharedSparkContext with PrivateMet
 assert(descendingP4 != p4)
   }
 
-  test("BoundaryPartitioner equality") {
-// Make an RDD where all the elements are the same so that the partition 
range bounds
-// are deterministically all the same.
-val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))
-
-v

git commit: SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 f6323eb3b -> 5c8e8de99


SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo

This was used in the past to have a cache of deserialized ShuffleMapTasks, but 
that's been removed, so there's no need for a lock. It slows down Spark when 
task descriptions are large, e.g. due to large lineage graphs or local 
variables.

Author: Sandeep 

Closes #707 from techaddict/SPARK-1775 and squashes the following commits:

18d8ebf [Sandeep] SPARK-1775: Unneeded lock in ShuffleMapTask.deserializeInfo 
This was used in the past to have a cache of deserialized ShuffleMapTasks, but 
that's been removed, so there's no need for a lock. It slows down Spark when 
task descriptions are large, e.g. due to large lineage graphs or local 
variables.
(cherry picked from commit 7db47c463fefc244e9c100d4aab90451c3828261)

Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c8e8de9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c8e8de9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c8e8de9

Branch: refs/heads/branch-1.0
Commit: 5c8e8de99ffa5aadc1a130c9a3cbeb3c4936eb71
Parents: f6323eb
Author: Sandeep 
Authored: Thu May 8 22:30:17 2014 -0700
Committer: Patrick Wendell 
Committed: Thu May 8 22:30:58 2014 -0700

--
 .../org/apache/spark/scheduler/ShuffleMapTask.scala | 16 +++-
 1 file changed, 7 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5c8e8de9/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 4b0324f..9ba586f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -57,15 +57,13 @@ private[spark] object ShuffleMapTask {
   }
 
   def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
ShuffleDependency[_,_]) = {
-synchronized {
-  val loader = Thread.currentThread.getContextClassLoader
-  val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-  val ser = SparkEnv.get.closureSerializer.newInstance()
-  val objIn = ser.deserializeStream(in)
-  val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-  val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
-  (rdd, dep)
-}
+val loader = Thread.currentThread.getContextClassLoader
+val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+val ser = SparkEnv.get.closureSerializer.newInstance()
+val objIn = ser.deserializeStream(in)
+val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
+(rdd, dep)
   }
 
   // Since both the JarSet and FileSet have the same format this is used for 
both.



git commit: [SQL] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext.

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 ef5e9d70f -> 618b3e6e7


[SQL] Make it possible to create Java/Python SQLContexts from an existing Scala 
SQLContext.

Author: Michael Armbrust 

Closes #761 from marmbrus/existingContext and squashes the following commits:

4651051 [Michael Armbrust] Make it possible to create Java/Python SQLContexts 
from an existing Scala SQLContext.

(cherry picked from commit 44233865cf8020741d862d33cc660c88e9315dea)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/618b3e6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/618b3e6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/618b3e6e

Branch: refs/heads/branch-1.0
Commit: 618b3e6e7d0bb826ed333b803fe0a7214e1b14ad
Parents: ef5e9d7
Author: Michael Armbrust 
Authored: Tue May 13 21:23:51 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 21:24:01 2014 -0700

--
 python/pyspark/sql.py | 7 +--
 .../scala/org/apache/spark/sql/api/java/JavaSQLContext.scala  | 4 ++--
 2 files changed, 7 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/618b3e6e/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 6789d70..bbe69e7 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -28,7 +28,7 @@ class SQLContext:
 register L{SchemaRDD}s as tables, execute sql over tables, cache tables, 
and read parquet files.
 """
 
-def __init__(self, sparkContext):
+def __init__(self, sparkContext, sqlContext = None):
 """
 Create a new SQLContext.
 
@@ -58,10 +58,13 @@ class SQLContext:
 self._jvm = self._sc._jvm
 self._pythonToJavaMap = self._jvm.PythonRDD.pythonToJavaMap
 
+if sqlContext:
+self._scala_SQLContext = sqlContext
+
 @property
 def _ssql_ctx(self):
 """
-Accessor for the JVM SparkSQL context.  Subclasses can overrite this 
property to provide
+Accessor for the JVM SparkSQL context.  Subclasses can override this 
property to provide
 their own JVM Contexts.
 """
 if not hasattr(self, '_scala_SQLContext'):

http://git-wip-us.apache.org/repos/asf/spark/blob/618b3e6e/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 57facbe..6f7d431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -33,9 +33,9 @@ import org.apache.spark.util.Utils
 /**
  * The entry point for executing Spark SQL queries from a Java program.
  */
-class JavaSQLContext(sparkContext: JavaSparkContext) {
+class JavaSQLContext(val sqlContext: SQLContext) {
 
-  val sqlContext = new SQLContext(sparkContext.sc)
+  def this(sparkContext: JavaSparkContext) = this(new 
SQLContext(sparkContext.sc))
 
   /**
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD



git commit: [SPARK-1784] Add a new partitioner to allow specifying # of keys per partition

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 618b3e6e7 -> 66fe4797a


[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition

This change adds a new partitioner which allows users
to specify # of keys per partition.

Author: Syed Hashmi 

Closes #721 from syedhashmi/master and squashes the following commits:

4ca94cc [Syed Hashmi] [SPARK-1784] Add a new partitioner

(cherry picked from commit 92cebada09a7e5a00ab48bcb350a9462949c33eb)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/66fe4797
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/66fe4797
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/66fe4797

Branch: refs/heads/branch-1.0
Commit: 66fe4797a845bb1a2728dcdb2d7371f0e90da867
Parents: 618b3e6
Author: Syed Hashmi 
Authored: Tue May 13 21:24:23 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 21:25:01 2014 -0700

--
 .../scala/org/apache/spark/Partitioner.scala| 61 
 .../org/apache/spark/PartitioningSuite.scala| 34 +++
 2 files changed, 95 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/66fe4797/core/src/main/scala/org/apache/spark/Partitioner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 9155159..6274796 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -156,3 +156,64 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   false
   }
 }
+
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions records into specified 
bounds
+ * Default value is 1000. Once all partitions have bounds elements, the 
partitioner
+ * allocates 1 element per partition so eventually the smaller partitions are 
at most
+ * off by 1 key compared to the larger partitions.
+ */
+class BoundaryPartitioner[K : Ordering : ClassTag, V](
+partitions: Int,
+@transient rdd: RDD[_ <: 
Product2[K,V]],
+private val boundary: Int 
= 1000)
+  extends Partitioner {
+
+  // this array keeps track of keys assigned to a partition
+  // counts[0] refers to # of keys in partition 0 and so on
+  private val counts: Array[Int] = {
+new Array[Int](numPartitions)
+  }
+
+  def numPartitions = math.abs(partitions)
+
+  /*
+  * Ideally, this should've been calculated based on # partitions and total 
keys
+  * But we are not calling count on RDD here to avoid calling an action.
+   * User has the flexibility of calling count and passing in any appropriate 
boundary
+   */
+  def keysPerPartition = boundary
+
+  var currPartition = 0
+
+  /*
+  * Pick current partition for the key until we hit the bound for keys / 
partition,
+  * start allocating to next partition at that time.
+  *
+  * NOTE: In case where we have lets say 2000 keys and user says 3 partitions 
with 500
+  * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 
1001-1500 go to P3,
+  * after that, next keys go to one partition at a time. So 1501 goes to P1, 
1502 goes to P2,
+  * 1503 goes to P3 and so on.
+   */
+  def getPartition(key: Any): Int = {
+val partition = currPartition
+counts(partition) = counts(partition) + 1
+/*
+* Since we are filling up a partition before moving to next one (this 
helps in maintaining
+* order of keys, in certain cases, it is possible to end up with empty 
partitions, like
+* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition 
will be entirely
+* empty.
+ */
+if(counts(currPartition) >= keysPerPartition)
+  currPartition = (currPartition + 1) % numPartitions
+partition
+  }
+
+  override def equals(other: Any): Boolean = other match {
+case r: BoundaryPartitioner[_,_] =>
+  (r.counts.sameElements(counts) && r.boundary == boundary
+&& r.currPartition == currPartition)
+case _ =>
+  false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/66fe4797/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7c30626..7d40395 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -66,6 +66,40 @@ class PartitioningSuite extends FunSuite with 
SharedSparkContext with PrivateMet
 assert(des

git commit: [SQL] Make it possible to create Java/Python SQLContexts from an existing Scala SQLContext.

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 753b04dea -> 44233865c


[SQL] Make it possible to create Java/Python SQLContexts from an existing Scala 
SQLContext.

Author: Michael Armbrust 

Closes #761 from marmbrus/existingContext and squashes the following commits:

4651051 [Michael Armbrust] Make it possible to create Java/Python SQLContexts 
from an existing Scala SQLContext.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44233865
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44233865
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44233865

Branch: refs/heads/master
Commit: 44233865cf8020741d862d33cc660c88e9315dea
Parents: 753b04d
Author: Michael Armbrust 
Authored: Tue May 13 21:23:51 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 21:23:51 2014 -0700

--
 python/pyspark/sql.py | 7 +--
 .../scala/org/apache/spark/sql/api/java/JavaSQLContext.scala  | 4 ++--
 2 files changed, 7 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/44233865/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 6789d70..bbe69e7 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -28,7 +28,7 @@ class SQLContext:
 register L{SchemaRDD}s as tables, execute sql over tables, cache tables, 
and read parquet files.
 """
 
-def __init__(self, sparkContext):
+def __init__(self, sparkContext, sqlContext = None):
 """
 Create a new SQLContext.
 
@@ -58,10 +58,13 @@ class SQLContext:
 self._jvm = self._sc._jvm
 self._pythonToJavaMap = self._jvm.PythonRDD.pythonToJavaMap
 
+if sqlContext:
+self._scala_SQLContext = sqlContext
+
 @property
 def _ssql_ctx(self):
 """
-Accessor for the JVM SparkSQL context.  Subclasses can overrite this 
property to provide
+Accessor for the JVM SparkSQL context.  Subclasses can override this 
property to provide
 their own JVM Contexts.
 """
 if not hasattr(self, '_scala_SQLContext'):

http://git-wip-us.apache.org/repos/asf/spark/blob/44233865/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 57facbe..6f7d431 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -33,9 +33,9 @@ import org.apache.spark.util.Utils
 /**
  * The entry point for executing Spark SQL queries from a Java program.
  */
-class JavaSQLContext(sparkContext: JavaSparkContext) {
+class JavaSQLContext(val sqlContext: SQLContext) {
 
-  val sqlContext = new SQLContext(sparkContext.sc)
+  def this(sparkContext: JavaSparkContext) = this(new 
SQLContext(sparkContext.sc))
 
   /**
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD



git commit: Implement ApproximateCountDistinct for SparkSql

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 66fe4797a -> 92b0ec9ac


Implement ApproximateCountDistinct for SparkSql

Add the implementation for ApproximateCountDistinct to SparkSql. We use the 
HyperLogLog algorithm implemented in stream-lib, and do the count in two 
phases: 1) counting the number of distinct elements in each partitions, and 2) 
merge the HyperLogLog results from different partitions.

A simple serializer and test cases are added as well.

Author: larvaboy 

Closes #737 from larvaboy/master and squashes the following commits:

bd8ef3f [larvaboy] Add support of user-provided standard deviation to 
ApproxCountDistinct.
9ba8360 [larvaboy] Fix alignment and null handling issues.
95b4067 [larvaboy] Add a test case for count distinct and approximate count 
distinct.
f57917d [larvaboy] Add the parser for the approximate count.
a2d5d10 [larvaboy] Add ApproximateCountDistinct aggregates and functions.
7ad273a [larvaboy] Add SparkSql serializer for HyperLogLog.
1d9aacf [larvaboy] Fix a minor typo in the toString method of the Count case 
class.
653542b [larvaboy] Fix a couple of minor typos.

(cherry picked from commit c33b8dcbf65a3a0c5ee5e65cd1dcdbc7da36aa5f)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92b0ec9a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92b0ec9a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92b0ec9a

Branch: refs/heads/branch-1.0
Commit: 92b0ec9ac9b082f8dea185ef9b462c0f3e3966e2
Parents: 66fe479
Author: larvaboy 
Authored: Tue May 13 21:26:08 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 21:26:21 2014 -0700

--
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  6 +-
 .../apache/spark/sql/catalyst/SqlParser.scala   |  7 ++
 .../sql/catalyst/expressions/aggregates.scala   | 78 +++-
 .../sql/execution/SparkSqlSerializer.scala  | 17 +
 .../org/apache/spark/sql/SQLQuerySuite.scala| 21 +-
 5 files changed, 122 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/92b0ec9a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 5efb438..bc6d204 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -217,7 +217,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative 
standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. 
Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. 
Uses the provided
+   * more accurate counts but increase the memory footprint and vice versa. 
Uses the provided
* Partitioner to partition the output RDD.
*/
   def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): 
RDD[(K, Long)] = {
@@ -232,7 +232,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative 
standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. 
Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. 
HashPartitions the
+   * more accurate counts but increase the memory footprint and vice versa. 
HashPartitions the
* output RDD into numPartitions.
*
*/
@@ -244,7 +244,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Return approximate number of distinct values for each key this RDD.
* The accuracy of approximation can be controlled through the relative 
standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. 
Lower values result in
-   * more accurate counts but increase the memory footprint and vise versa. 
The default value of
+   * more accurate counts but increase the memory footprint and vice versa. 
The default value of
* relativeSD is 0.05. Hash-partitions the output RDD using the existing 
partitioner/parallelism
* level.
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/92b0ec9a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sq

git commit: [SPARK-1784] Add a new partitioner to allow specifying # of keys per partition

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 44233865c -> 92cebada0


[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition

This change adds a new partitioner which allows users
to specify # of keys per partition.

Author: Syed Hashmi 

Closes #721 from syedhashmi/master and squashes the following commits:

4ca94cc [Syed Hashmi] [SPARK-1784] Add a new partitioner


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92cebada
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92cebada
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92cebada

Branch: refs/heads/master
Commit: 92cebada09a7e5a00ab48bcb350a9462949c33eb
Parents: 4423386
Author: Syed Hashmi 
Authored: Tue May 13 21:24:23 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 21:24:23 2014 -0700

--
 .../scala/org/apache/spark/Partitioner.scala| 61 
 .../org/apache/spark/PartitioningSuite.scala| 34 +++
 2 files changed, 95 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/92cebada/core/src/main/scala/org/apache/spark/Partitioner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala 
b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 9155159..6274796 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -156,3 +156,64 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   false
   }
 }
+
+/**
+ * A [[org.apache.spark.Partitioner]] that partitions records into specified 
bounds
+ * Default value is 1000. Once all partitions have bounds elements, the 
partitioner
+ * allocates 1 element per partition so eventually the smaller partitions are 
at most
+ * off by 1 key compared to the larger partitions.
+ */
+class BoundaryPartitioner[K : Ordering : ClassTag, V](
+partitions: Int,
+@transient rdd: RDD[_ <: 
Product2[K,V]],
+private val boundary: Int 
= 1000)
+  extends Partitioner {
+
+  // this array keeps track of keys assigned to a partition
+  // counts[0] refers to # of keys in partition 0 and so on
+  private val counts: Array[Int] = {
+new Array[Int](numPartitions)
+  }
+
+  def numPartitions = math.abs(partitions)
+
+  /*
+  * Ideally, this should've been calculated based on # partitions and total 
keys
+  * But we are not calling count on RDD here to avoid calling an action.
+   * User has the flexibility of calling count and passing in any appropriate 
boundary
+   */
+  def keysPerPartition = boundary
+
+  var currPartition = 0
+
+  /*
+  * Pick current partition for the key until we hit the bound for keys / 
partition,
+  * start allocating to next partition at that time.
+  *
+  * NOTE: In case where we have lets say 2000 keys and user says 3 partitions 
with 500
+  * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 
1001-1500 go to P3,
+  * after that, next keys go to one partition at a time. So 1501 goes to P1, 
1502 goes to P2,
+  * 1503 goes to P3 and so on.
+   */
+  def getPartition(key: Any): Int = {
+val partition = currPartition
+counts(partition) = counts(partition) + 1
+/*
+* Since we are filling up a partition before moving to next one (this 
helps in maintaining
+* order of keys, in certain cases, it is possible to end up with empty 
partitions, like
+* 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition 
will be entirely
+* empty.
+ */
+if(counts(currPartition) >= keysPerPartition)
+  currPartition = (currPartition + 1) % numPartitions
+partition
+  }
+
+  override def equals(other: Any): Boolean = other match {
+case r: BoundaryPartitioner[_,_] =>
+  (r.counts.sameElements(counts) && r.boundary == boundary
+&& r.currPartition == currPartition)
+case _ =>
+  false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/92cebada/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala 
b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
index 7c30626..7d40395 100644
--- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala
@@ -66,6 +66,40 @@ class PartitioningSuite extends FunSuite with 
SharedSparkContext with PrivateMet
 assert(descendingP4 != p4)
   }
 
+  test("BoundaryPartitioner equality") {
+// Make an RDD where all the elemen

git commit: [SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 5c0dafc2c -> 753b04dea


[SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath

JIRA issue: [SPARK-1527](https://issues.apache.org/jira/browse/SPARK-1527)

getName() only gets the last component of the file path. When deleting 
test-generated directories,
we should pass the generated directory's absolute path to DiskBlockManager.

Author: Ye Xianjin 

This patch had conflicts when merged, resolved by
Committer: Patrick Wendell 

Closes #436 from advancedxy/SPARK-1527 and squashes the following commits:

4678bab [Ye Xianjin] change rootDir*.getname to rootDir*.getAbsolutePath so the 
temporary directories are deleted when the test is finished.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/753b04de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/753b04de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/753b04de

Branch: refs/heads/master
Commit: 753b04dea4b04ba9d0dd0011f00e9d70367e76fc
Parents: 5c0dafc
Author: Ye Xianjin 
Authored: Tue May 13 19:03:51 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 19:03:51 2014 -0700

--
 .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/753b04de/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 2167718..aaa7714 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -52,7 +52,7 @@ class DiskBlockManagerSuite extends FunSuite with 
BeforeAndAfterEach with Before
 rootDir0.deleteOnExit()
 rootDir1 = Files.createTempDir()
 rootDir1.deleteOnExit()
-rootDirs = rootDir0.getName + "," + rootDir1.getName
+rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath
 println("Created root dirs: " + rootDirs)
   }
 



git commit: [SPARK-1816] LiveListenerBus dies if a listener throws an exception

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d6994f4e6 -> 3892ec584


[SPARK-1816] LiveListenerBus dies if a listener throws an exception

The solution is to wrap a try / catch / log around the posting of each event to 
each listener.

Author: Andrew Or 

Closes #759 from andrewor14/listener-die and squashes the following commits:

aee5107 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
listener-die
370939f [Andrew Or] Remove two layers of indirection
422d278 [Andrew Or] Explicitly throw an exception instead of 1 / 0
0df0e2a [Andrew Or] Try/catch and log exceptions when posting events
(cherry picked from commit 5c0dafc2c8734a421206a808b73be67b66264dd7)

Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3892ec58
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3892ec58
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3892ec58

Branch: refs/heads/branch-1.0
Commit: 3892ec584706a0ee122062ab896a7aca0ff02d93
Parents: d6994f4
Author: Andrew Or 
Authored: Tue May 13 18:32:32 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 18:32:44 2014 -0700

--
 .../spark/scheduler/LiveListenerBus.scala   | 36 ++
 .../spark/scheduler/SparkListenerBus.scala  | 50 +---
 .../scala/org/apache/spark/util/Utils.scala |  2 +-
 .../spark/scheduler/SparkListenerSuite.scala| 50 ++--
 4 files changed, 109 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3892ec58/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index dec3316..36a6e63 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
 
 import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 
 /**
  * Asynchronously passes SparkListenerEvents to registered SparkListeners.
@@ -42,7 +43,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus 
with Logging {
 
   private val listenerThread = new Thread("SparkListenerBus") {
 setDaemon(true)
-override def run() {
+override def run(): Unit = Utils.logUncaughtExceptions {
   while (true) {
 eventLock.acquire()
 // Atomically remove and process this event
@@ -77,11 +78,8 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
 val eventAdded = eventQueue.offer(event)
 if (eventAdded) {
   eventLock.release()
-} else if (!queueFullErrorMessageLogged) {
-  logError("Dropping SparkListenerEvent because no remaining room in event 
queue. " +
-"This likely means one of the SparkListeners is too slow and cannot 
keep up with the " +
-"rate at which tasks are being started by the scheduler.")
-  queueFullErrorMessageLogged = true
+} else {
+  logQueueFullErrorMessage()
 }
   }
 
@@ -96,14 +94,19 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   if (System.currentTimeMillis > finishTime) {
 return false
   }
-  /* Sleep rather than using wait/notify, because this is used only for 
testing and wait/notify
-   * add overhead in the general case. */
+  /* Sleep rather than using wait/notify, because this is used only for 
testing and
+   * wait/notify add overhead in the general case. */
   Thread.sleep(10)
 }
 true
   }
 
   /**
+   * For testing only. Return whether the listener daemon thread is still 
alive.
+   */
+  def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
+
+  /**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once 
belonged to this queue
@@ -111,6 +114,23 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
*/
   def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
 
+  /**
+   * Log an error message to indicate that the event queue is full. Do this 
only once.
+   */
+  private def logQueueFullErrorMessage(): Unit = {
+if (!queueFullErrorMessageLogged) {
+  if (listenerThread.isAlive) {
+logError("Dropping SparkListenerEvent because no remaining room in 
event queue. " +
+  "This likely means one of the SparkListeners is too slow and cannot 
keep up with" +
+  "the rate at which tasks are being started by the schedu

git commit: [SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 3892ec584 -> ef5e9d70f


[SPARK-1527] change rootDir*.getName to rootDir*.getAbsolutePath

JIRA issue: [SPARK-1527](https://issues.apache.org/jira/browse/SPARK-1527)

getName() only gets the last component of the file path. When deleting 
test-generated directories,
we should pass the generated directory's absolute path to DiskBlockManager.

Author: Ye Xianjin 

This patch had conflicts when merged, resolved by
Committer: Patrick Wendell 

Closes #436 from advancedxy/SPARK-1527 and squashes the following commits:

4678bab [Ye Xianjin] change rootDir*.getname to rootDir*.getAbsolutePath so the 
temporary directories are deleted when the test is finished.
(cherry picked from commit 753b04dea4b04ba9d0dd0011f00e9d70367e76fc)

Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef5e9d70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef5e9d70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef5e9d70

Branch: refs/heads/branch-1.0
Commit: ef5e9d70fafe9b819a6351fd041d0466e5c1d42d
Parents: 3892ec5
Author: Ye Xianjin 
Authored: Tue May 13 19:03:51 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 19:04:23 2014 -0700

--
 .../scala/org/apache/spark/storage/DiskBlockManagerSuite.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef5e9d70/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 2167718..aaa7714 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -52,7 +52,7 @@ class DiskBlockManagerSuite extends FunSuite with 
BeforeAndAfterEach with Before
 rootDir0.deleteOnExit()
 rootDir1 = Files.createTempDir()
 rootDir1.deleteOnExit()
-rootDirs = rootDir0.getName + "," + rootDir1.getName
+rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath
 println("Created root dirs: " + rootDirs)
   }
 



git commit: [SPARK-1816] LiveListenerBus dies if a listener throws an exception

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master d1e487473 -> 5c0dafc2c


[SPARK-1816] LiveListenerBus dies if a listener throws an exception

The solution is to wrap a try / catch / log around the posting of each event to 
each listener.

Author: Andrew Or 

Closes #759 from andrewor14/listener-die and squashes the following commits:

aee5107 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
listener-die
370939f [Andrew Or] Remove two layers of indirection
422d278 [Andrew Or] Explicitly throw an exception instead of 1 / 0
0df0e2a [Andrew Or] Try/catch and log exceptions when posting events


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c0dafc2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c0dafc2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c0dafc2

Branch: refs/heads/master
Commit: 5c0dafc2c8734a421206a808b73be67b66264dd7
Parents: d1e4874
Author: Andrew Or 
Authored: Tue May 13 18:32:32 2014 -0700
Committer: Patrick Wendell 
Committed: Tue May 13 18:32:32 2014 -0700

--
 .../spark/scheduler/LiveListenerBus.scala   | 36 ++
 .../spark/scheduler/SparkListenerBus.scala  | 50 +---
 .../scala/org/apache/spark/util/Utils.scala |  2 +-
 .../spark/scheduler/SparkListenerSuite.scala| 50 ++--
 4 files changed, 109 insertions(+), 29 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5c0dafc2/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index dec3316..36a6e63 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import java.util.concurrent.{LinkedBlockingQueue, Semaphore}
 
 import org.apache.spark.Logging
+import org.apache.spark.util.Utils
 
 /**
  * Asynchronously passes SparkListenerEvents to registered SparkListeners.
@@ -42,7 +43,7 @@ private[spark] class LiveListenerBus extends SparkListenerBus 
with Logging {
 
   private val listenerThread = new Thread("SparkListenerBus") {
 setDaemon(true)
-override def run() {
+override def run(): Unit = Utils.logUncaughtExceptions {
   while (true) {
 eventLock.acquire()
 // Atomically remove and process this event
@@ -77,11 +78,8 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
 val eventAdded = eventQueue.offer(event)
 if (eventAdded) {
   eventLock.release()
-} else if (!queueFullErrorMessageLogged) {
-  logError("Dropping SparkListenerEvent because no remaining room in event 
queue. " +
-"This likely means one of the SparkListeners is too slow and cannot 
keep up with the " +
-"rate at which tasks are being started by the scheduler.")
-  queueFullErrorMessageLogged = true
+} else {
+  logQueueFullErrorMessage()
 }
   }
 
@@ -96,14 +94,19 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   if (System.currentTimeMillis > finishTime) {
 return false
   }
-  /* Sleep rather than using wait/notify, because this is used only for 
testing and wait/notify
-   * add overhead in the general case. */
+  /* Sleep rather than using wait/notify, because this is used only for 
testing and
+   * wait/notify add overhead in the general case. */
   Thread.sleep(10)
 }
 true
   }
 
   /**
+   * For testing only. Return whether the listener daemon thread is still 
alive.
+   */
+  def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive }
+
+  /**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once 
belonged to this queue
@@ -111,6 +114,23 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
*/
   def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty }
 
+  /**
+   * Log an error message to indicate that the event queue is full. Do this 
only once.
+   */
+  private def logQueueFullErrorMessage(): Unit = {
+if (!queueFullErrorMessageLogged) {
+  if (listenerThread.isAlive) {
+logError("Dropping SparkListenerEvent because no remaining room in 
event queue. " +
+  "This likely means one of the SparkListeners is too slow and cannot 
keep up with" +
+  "the rate at which tasks are being started by the scheduler.")
+  } else {
+logError("SparkListenerBus thread is dead! This means 
SparkListenerEvents hav

git commit: SPARK-1791 - SVM implementation does not use threshold parameter

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d08e9604f -> d6994f4e6


SPARK-1791 - SVM implementation does not use threshold parameter

Summary:
https://issues.apache.org/jira/browse/SPARK-1791

Simple fix, and backward compatible, since

- anyone who set the threshold was getting completely wrong answers.
- anyone who did not set the threshold had the default 0.0 value for the 
threshold anyway.

Test Plan:
Unit test added that is verified to fail under the old implementation,
and pass under the new implementation.

Reviewers:

CC:

Author: Andrew Tulloch 

Closes #725 from ajtulloch/SPARK-1791-SVM and squashes the following commits:

770f55d [Andrew Tulloch] SPARK-1791 - SVM implementation does not use threshold 
parameter

(cherry picked from commit d1e487473fd509f28daf28dcda856f3c2f1194ec)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6994f4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6994f4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6994f4e

Branch: refs/heads/branch-1.0
Commit: d6994f4e67c9ab98f7a707fc744939dc0c9107cf
Parents: d08e960
Author: Andrew Tulloch 
Authored: Tue May 13 17:31:27 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 17:31:38 2014 -0700

--
 .../apache/spark/mllib/classification/SVM.scala |  2 +-
 .../spark/mllib/classification/SVMSuite.scala   | 37 
 2 files changed, 38 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d6994f4e/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
index e052135..316ecd7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
@@ -65,7 +65,7 @@ class SVMModel private[mllib] (
   intercept: Double) = {
 val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
 threshold match {
-  case Some(t) => if (margin < 0) 0.0 else 1.0
+  case Some(t) => if (margin < t) 0.0 else 1.0
   case None => margin
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6994f4e/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index 77d6f04..886c71d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -69,6 +69,43 @@ class SVMSuite extends FunSuite with LocalSparkContext {
 assert(numOffPredictions < input.length / 5)
   }
 
+  test("SVM with threshold") {
+val nPoints = 1
+
+// NOTE: Intercept should be small for generating equal 0s and 1s
+val A = 0.01
+val B = -1.5
+val C = 1.0
+
+val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 
42)
+
+val testRDD = sc.parallelize(testData, 2)
+testRDD.cache()
+
+val svm = new SVMWithSGD().setIntercept(true)
+svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+
+val model = svm.run(testRDD)
+
+val validationData = SVMSuite.generateSVMInput(A, Array[Double](B, C), 
nPoints, 17)
+val validationRDD  = sc.parallelize(validationData, 2)
+
+// Test prediction on RDD.
+
+var predictions = model.predict(validationRDD.map(_.features)).collect()
+assert(predictions.count(_ == 0.0) != predictions.length)
+
+// High threshold makes all the predictions 0.0
+model.setThreshold(1.0)
+predictions = model.predict(validationRDD.map(_.features)).collect()
+assert(predictions.count(_ == 0.0) == predictions.length)
+
+// Low threshold makes all the predictions 1.0
+model.setThreshold(-1.0)
+predictions = model.predict(validationRDD.map(_.features)).collect()
+assert(predictions.count(_ == 1.0) == predictions.length)
+  }
+
   test("SVM using local random SGD") {
 val nPoints = 1
 



git commit: SPARK-1791 - SVM implementation does not use threshold parameter

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 16ffadcc4 -> d1e487473


SPARK-1791 - SVM implementation does not use threshold parameter

Summary:
https://issues.apache.org/jira/browse/SPARK-1791

Simple fix, and backward compatible, since

- anyone who set the threshold was getting completely wrong answers.
- anyone who did not set the threshold had the default 0.0 value for the 
threshold anyway.

Test Plan:
Unit test added that is verified to fail under the old implementation,
and pass under the new implementation.

Reviewers:

CC:

Author: Andrew Tulloch 

Closes #725 from ajtulloch/SPARK-1791-SVM and squashes the following commits:

770f55d [Andrew Tulloch] SPARK-1791 - SVM implementation does not use threshold 
parameter


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d1e48747
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d1e48747
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d1e48747

Branch: refs/heads/master
Commit: d1e487473fd509f28daf28dcda856f3c2f1194ec
Parents: 16ffadc
Author: Andrew Tulloch 
Authored: Tue May 13 17:31:27 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 17:31:27 2014 -0700

--
 .../apache/spark/mllib/classification/SVM.scala |  2 +-
 .../spark/mllib/classification/SVMSuite.scala   | 37 
 2 files changed, 38 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d1e48747/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
index e052135..316ecd7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
@@ -65,7 +65,7 @@ class SVMModel private[mllib] (
   intercept: Double) = {
 val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
 threshold match {
-  case Some(t) => if (margin < 0) 0.0 else 1.0
+  case Some(t) => if (margin < t) 0.0 else 1.0
   case None => margin
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d1e48747/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index 77d6f04..886c71d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -69,6 +69,43 @@ class SVMSuite extends FunSuite with LocalSparkContext {
 assert(numOffPredictions < input.length / 5)
   }
 
+  test("SVM with threshold") {
+val nPoints = 1
+
+// NOTE: Intercept should be small for generating equal 0s and 1s
+val A = 0.01
+val B = -1.5
+val C = 1.0
+
+val testData = SVMSuite.generateSVMInput(A, Array[Double](B, C), nPoints, 
42)
+
+val testRDD = sc.parallelize(testData, 2)
+testRDD.cache()
+
+val svm = new SVMWithSGD().setIntercept(true)
+svm.optimizer.setStepSize(1.0).setRegParam(1.0).setNumIterations(100)
+
+val model = svm.run(testRDD)
+
+val validationData = SVMSuite.generateSVMInput(A, Array[Double](B, C), 
nPoints, 17)
+val validationRDD  = sc.parallelize(validationData, 2)
+
+// Test prediction on RDD.
+
+var predictions = model.predict(validationRDD.map(_.features)).collect()
+assert(predictions.count(_ == 0.0) != predictions.length)
+
+// High threshold makes all the predictions 0.0
+model.setThreshold(1.0)
+predictions = model.predict(validationRDD.map(_.features)).collect()
+assert(predictions.count(_ == 0.0) == predictions.length)
+
+// Low threshold makes all the predictions 1.0
+model.setThreshold(-1.0)
+predictions = model.predict(validationRDD.map(_.features)).collect()
+assert(predictions.count(_ == 1.0) == predictions.length)
+  }
+
   test("SVM using local random SGD") {
 val nPoints = 1
 



git commit: SPARK-571: forbid return statements in cleaned closures

2014-05-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 52d905296 -> 16ffadcc4


SPARK-571: forbid return statements in cleaned closures

This patch checks top-level closure arguments to `ClosureCleaner.clean` for 
`return` statements and raises an exception if it finds any.  This is mainly a 
user-friendliness addition, since programs with return statements in closure 
arguments will currently fail upon RDD actions with a less-than-intuitive error 
message.

Author: William Benton 

Closes #717 from willb/spark-571 and squashes the following commits:

c41eb7d [William Benton] Another test case for SPARK-571
30c42f4 [William Benton] Stylistic cleanups
559b16b [William Benton] Stylistic cleanups from review
de13b79 [William Benton] Style fixes
295b6a5 [William Benton] Forbid return statements in closure arguments.
b017c47 [William Benton] Added a test for SPARK-571


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16ffadcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16ffadcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16ffadcc

Branch: refs/heads/master
Commit: 16ffadcc4af21430b5079dc555bcd9d8cf1fa1fa
Parents: 52d9052
Author: William Benton 
Authored: Tue May 13 13:45:23 2014 -0700
Committer: Reynold Xin 
Committed: Tue May 13 13:45:23 2014 -0700

--
 .../org/apache/spark/util/ClosureCleaner.scala  | 23 +++-
 .../apache/spark/util/ClosureCleanerSuite.scala | 39 +++-
 2 files changed, 60 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/16ffadcc/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 2d05e09..4916d9b 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.Set
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, 
ClassVisitor, MethodVisitor, Type}
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkException}
 
 private[spark] object ClosureCleaner extends Logging {
   // Get an ASM class reader for a given class from the JAR that loaded it
@@ -108,6 +108,9 @@ private[spark] object ClosureCleaner extends Logging {
 val outerObjects = getOuterObjects(func)
 
 val accessedFields = Map[Class[_], Set[String]]()
+
+getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+
 for (cls <- outerClasses)
   accessedFields(cls) = Set[String]()
 for (cls <- func.getClass :: innerClasses)
@@ -181,6 +184,24 @@ private[spark] object ClosureCleaner extends Logging {
 }
 
 private[spark]
+class ReturnStatementFinder extends ClassVisitor(ASM4) {
+  override def visitMethod(access: Int, name: String, desc: String,
+  sig: String, exceptions: Array[String]): MethodVisitor = {
+if (name.contains("apply")) {
+  new MethodVisitor(ASM4) {
+override def visitTypeInsn(op: Int, tp: String) {
+  if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) 
{
+throw new SparkException("Return statements aren't allowed in 
Spark closures")
+  }
+}
+  }
+} else {
+  new MethodVisitor(ASM4) {}
+}
+  }
+}
+
+private[spark]
 class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends 
ClassVisitor(ASM4) {
   override def visitMethod(access: Int, name: String, desc: String,
   sig: String, exceptions: Array[String]): MethodVisitor = {

http://git-wip-us.apache.org/repos/asf/spark/blob/16ffadcc/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index d7e48e6..054ef54 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
 import org.scalatest.FunSuite
 
 import org.apache.spark.LocalSparkContext._
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
 
 class ClosureCleanerSuite extends FunSuite {
   test("closures inside an object") {
@@ -50,6 +50,19 @@ class ClosureCleanerSuite extends FunSuite {
 val obj = new TestClassWithNesting(1)
 assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 

git commit: Rollback versions for 1.0.0-rc4

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d78e37ab5 -> 51142b773


Rollback versions for 1.0.0-rc4


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51142b77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51142b77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51142b77

Branch: refs/heads/branch-1.0
Commit: 51142b7732209250dd41e0a4182d02df81d7cc8b
Parents: d78e37a
Author: Patrick Wendell 
Authored: Mon May 12 15:23:53 2014 -0700
Committer: Patrick Wendell 
Committed: Mon May 12 15:23:53 2014 -0700

--
 assembly/pom.xml  | 2 +-
 bagel/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 examples/pom.xml  | 2 +-
 external/flume/pom.xml| 2 +-
 external/kafka/pom.xml| 2 +-
 external/mqtt/pom.xml | 2 +-
 external/twitter/pom.xml  | 2 +-
 external/zeromq/pom.xml   | 2 +-
 extras/spark-ganglia-lgpl/pom.xml | 2 +-
 graphx/pom.xml| 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 yarn/stable/pom.xml   | 2 +-
 21 files changed, 21 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 6e29704..6c4d46a 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index c8ad40f..355f437 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 458e14c..bab50f5 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 9156a11..874bcd7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/flume/pom.xml
--
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index 1cefa15..6aec215 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/kafka/pom.xml
--
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index cc05e69..979eb0c 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/mqtt/pom.xml
--
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index ab5afa2..7b2dc5b 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51142b77/external/twitter/pom.xml
--
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
index 7abbd4b..5766d3a 100644
--- a/external/twitter/pom.xml
+++ b/external/twitter/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent
-1.0.2-SNAPSHOT
+1.0.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/re

[3/3] git commit: BUILD: Add more content to make-distribution.sh.

2014-05-13 Thread pwendell
BUILD: Add more content to make-distribution.sh.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/716462c6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/716462c6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/716462c6

Branch: refs/heads/branch-1.0
Commit: 716462c6c265325f2b38ff518ba1610fdc3a139b
Parents: 9b8b737
Author: Patrick Wendell 
Authored: Mon May 12 23:02:54 2014 -0700
Committer: Patrick Wendell 
Committed: Mon May 12 23:24:40 2014 -0700

--
 make-distribution.sh | 13 +
 1 file changed, 13 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/716462c6/make-distribution.sh
--
diff --git a/make-distribution.sh b/make-distribution.sh
index 1cc2844..7a08d6b 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -171,10 +171,22 @@ echo "Spark $VERSION built for Hadoop 
$SPARK_HADOOP_VERSION" > "$DISTDIR/RELEASE
 cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/lib/"
 cp $FWDIR/examples/target/scala*/spark-examples*.jar "$DISTDIR/lib/"
 
+# Copy example sources (needed for python and SQL)
+mkdir -p "$DISTDIR/examples/src/main"
+cp -r $FWDIR/examples/src/main "$DISTDIR/examples/src/" 
+
 if [ "$SPARK_HIVE" == "true" ]; then
   cp $FWDIR/lib_managed/jars/datanucleus*.jar "$DISTDIR/lib/"
 fi
 
+# Copy license and ASF files
+cp "$FWDIR/LICENSE" "$DISTDIR"
+cp "$FWDIR/NOTICE" "$DISTDIR"
+
+if [ -e $FWDIR/CHANGES.txt ]; then
+  cp "$FWDIR/CHANGES.txt" "$DISTDIR"
+fi
+
 # Copy other things
 mkdir "$DISTDIR"/conf
 cp "$FWDIR"/conf/*.template "$DISTDIR"/conf
@@ -182,6 +194,7 @@ cp "$FWDIR"/conf/slaves "$DISTDIR"/conf
 cp -r "$FWDIR/bin" "$DISTDIR"
 cp -r "$FWDIR/python" "$DISTDIR"
 cp -r "$FWDIR/sbin" "$DISTDIR"
+cp -r "$FWDIR/ec2" "$DISTDIR"
 
 # Download and copy in tachyon, if requested
 if [ "$SPARK_TACHYON" == "true" ]; then



Git Push Summary

2014-05-13 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.0.0-rc2 [deleted] 327ab1e24


Git Push Summary

2014-05-13 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.0.0-rc1 [deleted] c8d0eb980


Git Push Summary

2014-05-13 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.0.0-rc4 [deleted] 4f6ab6c68


Git Push Summary

2014-05-13 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.0.0-rc3 [deleted] e7c46933a


git commit: [SPARK-1780] Non-existent SPARK_DAEMON_OPTS is lurking around

2014-05-13 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 156df87e7 -> ba96bb3d5


[SPARK-1780] Non-existent SPARK_DAEMON_OPTS is lurking around

What they really mean is SPARK_DAEMON_***JAVA***_OPTS

Author: Andrew Or 

Closes #751 from andrewor14/spark-daemon-opts and squashes the following 
commits:

70c41f9 [Andrew Or] SPARK_DAEMON_OPTS -> SPARK_DAEMON_JAVA_OPTS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba96bb3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba96bb3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba96bb3d

Branch: refs/heads/master
Commit: ba96bb3d591130075763706526f86fb2aaffa3ae
Parents: 156df87
Author: Andrew Or 
Authored: Mon May 12 19:42:35 2014 -0700
Committer: Patrick Wendell 
Committed: Mon May 12 19:42:35 2014 -0700

--
 conf/spark-env.sh.template   | 2 +-
 core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ba96bb3d/conf/spark-env.sh.template
--
diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template
index f906be6..4479e1e 100755
--- a/conf/spark-env.sh.template
+++ b/conf/spark-env.sh.template
@@ -39,5 +39,5 @@
 # - SPARK_WORKER_DIR, to set the working directory of worker processes
 # - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. 
"-Dx=y")
 # - SPARK_HISTORY_OPTS, to set config properties only for the history server 
(e.g. "-Dx=y")
-# - SPARK_DAEMON_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
+# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. 
"-Dx=y")
 # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

http://git-wip-us.apache.org/repos/asf/spark/blob/ba96bb3d/core/src/main/scala/org/apache/spark/SparkConf.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index bd21fdc..8006166 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -247,7 +247,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging {
   | - ./spark-submit with conf/spark-defaults.conf to set defaults for 
an application
   | - ./spark-submit with --driver-java-options to set -X options for 
a driver
   | - spark.executor.extraJavaOptions to set -X options for executors
-  | - SPARK_DAEMON_OPTS to set java options for standalone daemons 
(i.e. master, worker)
+  | - SPARK_DAEMON_JAVA_OPTS to set java options for standalone 
daemons (master or worker)
 """.stripMargin
   logError(error)