spark git commit: [SPARK-9340] [SQL] Fixes converting unannotated Parquet lists

2015-08-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 94692bb14 - 01efa4f27


[SPARK-9340] [SQL] Fixes converting unannotated Parquet lists

This PR is inspired by #8063 authored by dguy. Especially, testing Parquet 
files added here are all taken from that PR.

**Committer who merges this PR should attribute it to Damian Guy 
damian.guygmail.com.**



SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement 
backwards-compatibility rules defined in `parquet-format` spec. However, both 
Spark SQL and `parquet-avro` neglected the following statement in 
`parquet-format`:

 This does not affect repeated fields that are not annotated: A repeated field 
 that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated 
 by `LIST` or `MAP` should be interpreted as a required list of required 
 elements where the element type is the type of the field.

One of the consequences is that, Parquet files generated by `parquet-protobuf` 
containing unannotated repeated fields are not correctly converted to Catalyst 
arrays.

This PR fixes this issue by

1. Handling unannotated repeated fields in `CatalystSchemaConverter`.
2. Converting this kind of special repeated fields to Catalyst arrays in 
`CatalystRowConverter`.

   Two special converters, `RepeatedPrimitiveConverter` and 
`RepeatedGroupConverter`, are added. They delegate actual conversion work to a 
child `elementConverter` and accumulates elements in an `ArrayBuffer`.

   Two extra methods, `start()` and `end()`, are added to 
`ParentContainerUpdater`. So that they can be used to initialize new 
`ArrayBuffer`s for unannotated repeated fields, and propagate converted array 
values to upstream.

Author: Cheng Lian l...@databricks.com

Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes 
the following commits:

ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite
f1c7bfd [Cheng Lian] Updates .rat-excludes
420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists

(cherry picked from commit 071bbad5db1096a548c886762b611a8484a52753)
Signed-off-by: Cheng Lian l...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01efa4f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01efa4f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01efa4f2

Branch: refs/heads/branch-1.5
Commit: 01efa4f27db1eefba9cb0fe2dec790556f3280de
Parents: 94692bb
Author: Damian Guy damian@gmail.com
Authored: Tue Aug 11 12:46:33 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Tue Aug 11 12:46:54 2015 +0800

--
 .rat-excludes   |   1 +
 .../parquet/CatalystRowConverter.scala  | 151 +++
 .../parquet/CatalystSchemaConverter.scala   |   7 +-
 .../test/resources/nested-array-struct.parquet  | Bin 0 - 775 bytes
 .../src/test/resources/old-repeated-int.parquet | Bin 0 - 389 bytes
 .../test/resources/old-repeated-message.parquet | Bin 0 - 600 bytes
 .../src/test/resources/old-repeated.parquet | Bin 0 - 432 bytes
 .../parquet-thrift-compat.snappy.parquet| Bin
 .../resources/proto-repeated-string.parquet | Bin 0 - 411 bytes
 .../resources/proto-repeated-struct.parquet | Bin 0 - 608 bytes
 .../proto-struct-with-array-many.parquet| Bin 0 - 802 bytes
 .../resources/proto-struct-with-array.parquet   | Bin 0 - 1576 bytes
 .../ParquetProtobufCompatibilitySuite.scala |  91 +++
 .../parquet/ParquetSchemaSuite.scala|  30 
 14 files changed, 247 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/.rat-excludes
--
diff --git a/.rat-excludes b/.rat-excludes
index 7277146..9165872 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -94,3 +94,4 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
+.*parquet

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 3542dfb..ab5a6dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
 
 import scala.collection.JavaConversions._
-import 

spark git commit: [SPARK-9340] [SQL] Fixes converting unannotated Parquet lists

2015-08-10 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 3c9802d94 - 071bbad5d


[SPARK-9340] [SQL] Fixes converting unannotated Parquet lists

This PR is inspired by #8063 authored by dguy. Especially, testing Parquet 
files added here are all taken from that PR.

**Committer who merges this PR should attribute it to Damian Guy 
damian.guygmail.com.**



SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement 
backwards-compatibility rules defined in `parquet-format` spec. However, both 
Spark SQL and `parquet-avro` neglected the following statement in 
`parquet-format`:

 This does not affect repeated fields that are not annotated: A repeated field 
 that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated 
 by `LIST` or `MAP` should be interpreted as a required list of required 
 elements where the element type is the type of the field.

One of the consequences is that, Parquet files generated by `parquet-protobuf` 
containing unannotated repeated fields are not correctly converted to Catalyst 
arrays.

This PR fixes this issue by

1. Handling unannotated repeated fields in `CatalystSchemaConverter`.
2. Converting this kind of special repeated fields to Catalyst arrays in 
`CatalystRowConverter`.

   Two special converters, `RepeatedPrimitiveConverter` and 
`RepeatedGroupConverter`, are added. They delegate actual conversion work to a 
child `elementConverter` and accumulates elements in an `ArrayBuffer`.

   Two extra methods, `start()` and `end()`, are added to 
`ParentContainerUpdater`. So that they can be used to initialize new 
`ArrayBuffer`s for unannotated repeated fields, and propagate converted array 
values to upstream.

Author: Cheng Lian l...@databricks.com

Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes 
the following commits:

ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite
f1c7bfd [Cheng Lian] Updates .rat-excludes
420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/071bbad5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/071bbad5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/071bbad5

Branch: refs/heads/master
Commit: 071bbad5db1096a548c886762b611a8484a52753
Parents: 3c9802d
Author: Damian Guy damian@gmail.com
Authored: Tue Aug 11 12:46:33 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Tue Aug 11 12:46:33 2015 +0800

--
 .rat-excludes   |   1 +
 .../parquet/CatalystRowConverter.scala  | 151 +++
 .../parquet/CatalystSchemaConverter.scala   |   7 +-
 .../test/resources/nested-array-struct.parquet  | Bin 0 - 775 bytes
 .../src/test/resources/old-repeated-int.parquet | Bin 0 - 389 bytes
 .../test/resources/old-repeated-message.parquet | Bin 0 - 600 bytes
 .../src/test/resources/old-repeated.parquet | Bin 0 - 432 bytes
 .../parquet-thrift-compat.snappy.parquet| Bin
 .../resources/proto-repeated-string.parquet | Bin 0 - 411 bytes
 .../resources/proto-repeated-struct.parquet | Bin 0 - 608 bytes
 .../proto-struct-with-array-many.parquet| Bin 0 - 802 bytes
 .../resources/proto-struct-with-array.parquet   | Bin 0 - 1576 bytes
 .../ParquetProtobufCompatibilitySuite.scala |  91 +++
 .../parquet/ParquetSchemaSuite.scala|  30 
 14 files changed, 247 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/.rat-excludes
--
diff --git a/.rat-excludes b/.rat-excludes
index 7277146..9165872 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -94,3 +94,4 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
+.*parquet

http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 3542dfb..ab5a6dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.parquet.column.Dictionary
 import 

spark git commit: [SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 071bbad5d - 91e9389f3


[SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer 
join

This patch adds a new `SortMergeOuterJoin` operator that performs left and 
right outer joins using sort merge join.  It also refactors `SortMergeJoin` in 
order to improve performance and code clarity.

Along the way, I also performed a couple pieces of minor cleanup and 
optimization:

- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since 
it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence 
for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf 
settings.

This patch incorporates several ideas from adrian-wang's patch, #5717.

Closes #5717.

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/7904)
!-- Reviewable:end --

Author: Josh Rosen joshro...@databricks.com
Author: Daoyuan Wang daoyuan.w...@intel.com

Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91e9389f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91e9389f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91e9389f

Branch: refs/heads/master
Commit: 91e9389f39509e63654bd4bcb7bd919eaedda910
Parents: 071bbad
Author: Josh Rosen joshro...@databricks.com
Authored: Mon Aug 10 22:04:41 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 22:04:41 2015 -0700

--
 .../sql/catalyst/expressions/JoinedRow.scala|   6 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +-
 .../spark/sql/execution/RowIterator.scala   |  93 ++
 .../spark/sql/execution/SparkStrategies.scala   |  45 ++-
 .../joins/BroadcastNestedLoopJoin.scala |   5 +-
 .../sql/execution/joins/SortMergeJoin.scala | 331 +--
 .../execution/joins/SortMergeOuterJoin.scala| 251 ++
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 132 
 .../sql/execution/joins/InnerJoinSuite.scala| 180 ++
 .../sql/execution/joins/OuterJoinSuite.scala| 310 -
 .../sql/execution/joins/SemiJoinSuite.scala | 125 ---
 .../apache/spark/sql/test/SQLTestUtils.scala|   2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   2 +-
 13 files changed, 1165 insertions(+), 319 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/91e9389f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
index b76757c..d3560df 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
@@ -37,20 +37,20 @@ class JoinedRow extends InternalRow {
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
-  def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
+  def apply(r1: InternalRow, r2: InternalRow): JoinedRow = {
 row1 = r1
 row2 = r2
 this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: InternalRow): InternalRow = {
+  def withLeft(newLeft: InternalRow): JoinedRow = {
 row1 = newLeft
 this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
-  def withRight(newRight: InternalRow): InternalRow = {
+  def withRight(newRight: InternalRow): JoinedRow = {
 row2 = newRight
 this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/91e9389f/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f73bb04..4bf00b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -873,7 +873,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   HashAggregation ::
   Aggregation ::
   LeftSemiJoin ::
-  HashJoin ::
+  EquiJoinSelection ::
   InMemoryScans ::
   BasicOperators ::
   CartesianProduct ::


spark git commit: [SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer join

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 01efa4f27 - f9beef998


[SPARK-9729] [SPARK-9363] [SQL] Use sort merge join for left and right outer 
join

This patch adds a new `SortMergeOuterJoin` operator that performs left and 
right outer joins using sort merge join.  It also refactors `SortMergeJoin` in 
order to improve performance and code clarity.

Along the way, I also performed a couple pieces of minor cleanup and 
optimization:

- Rename the `HashJoin` physical planner rule to `EquiJoinSelection`, since 
it's also used for non-hash joins.
- Rewrite the comment at the top of `HashJoin` to better explain the precedence 
for choosing join operators.
- Update `JoinSuite` to use `SqlTestUtils.withConf` for changing SQLConf 
settings.

This patch incorporates several ideas from adrian-wang's patch, #5717.

Closes #5717.

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/7904)
!-- Reviewable:end --

Author: Josh Rosen joshro...@databricks.com
Author: Daoyuan Wang daoyuan.w...@intel.com

Closes #7904 from JoshRosen/outer-join-smj and squashes 1 commits.

(cherry picked from commit 91e9389f39509e63654bd4bcb7bd919eaedda910)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9beef99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9beef99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9beef99

Branch: refs/heads/branch-1.5
Commit: f9beef9987c6a1993990e6695fb295019e5ed5d3
Parents: 01efa4f
Author: Josh Rosen joshro...@databricks.com
Authored: Mon Aug 10 22:04:41 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 22:04:50 2015 -0700

--
 .../sql/catalyst/expressions/JoinedRow.scala|   6 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +-
 .../spark/sql/execution/RowIterator.scala   |  93 ++
 .../spark/sql/execution/SparkStrategies.scala   |  45 ++-
 .../joins/BroadcastNestedLoopJoin.scala |   5 +-
 .../sql/execution/joins/SortMergeJoin.scala | 331 +--
 .../execution/joins/SortMergeOuterJoin.scala| 251 ++
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 132 
 .../sql/execution/joins/InnerJoinSuite.scala| 180 ++
 .../sql/execution/joins/OuterJoinSuite.scala| 310 -
 .../sql/execution/joins/SemiJoinSuite.scala | 125 ---
 .../apache/spark/sql/test/SQLTestUtils.scala|   2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |   2 +-
 13 files changed, 1165 insertions(+), 319 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f9beef99/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
index b76757c..d3560df 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/JoinedRow.scala
@@ -37,20 +37,20 @@ class JoinedRow extends InternalRow {
   }
 
   /** Updates this JoinedRow to used point at two new base rows.  Returns 
itself. */
-  def apply(r1: InternalRow, r2: InternalRow): InternalRow = {
+  def apply(r1: InternalRow, r2: InternalRow): JoinedRow = {
 row1 = r1
 row2 = r2
 this
   }
 
   /** Updates this JoinedRow by updating its left base row.  Returns itself. */
-  def withLeft(newLeft: InternalRow): InternalRow = {
+  def withLeft(newLeft: InternalRow): JoinedRow = {
 row1 = newLeft
 this
   }
 
   /** Updates this JoinedRow by updating its right base row.  Returns itself. 
*/
-  def withRight(newRight: InternalRow): InternalRow = {
+  def withRight(newRight: InternalRow): JoinedRow = {
 row2 = newRight
 this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f9beef99/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f73bb04..4bf00b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -873,7 +873,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
   HashAggregation ::
   Aggregation ::
   LeftSemiJoin ::
-  HashJoin ::
+  EquiJoinSelection ::
   InMemoryScans ::
   

spark git commit: [SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods

2015-08-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 94b2f5b32 - 3ee2c8d16


[SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods

Adds method documentations back to `MultivariateOnlineSummarizer`, which were 
present in 1.4 but disappeared somewhere along the way to 1.5.

jkbradley

Author: Feynman Liang fli...@databricks.com

Closes #8045 from feynmanliang/SPARK-9755 and squashes the following commits:

af67fde [Feynman Liang] Add MultivariateOnlineSummarizer docs

(cherry picked from commit 00b655cced637e1c3b750c19266086b9dcd7c158)
Signed-off-by: Joseph K. Bradley jos...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ee2c8d1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ee2c8d1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ee2c8d1

Branch: refs/heads/branch-1.5
Commit: 3ee2c8d169e48e0bca3fab702466e7a855f57f8e
Parents: 94b2f5b
Author: Feynman Liang fli...@databricks.com
Authored: Mon Aug 10 11:01:45 2015 -0700
Committer: Joseph K. Bradley jos...@databricks.com
Committed: Mon Aug 10 11:01:55 2015 -0700

--
 .../mllib/stat/MultivariateOnlineSummarizer.scala   | 16 
 1 file changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3ee2c8d1/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
index 62da9f2..64e4be0 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
@@ -153,6 +153,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Sample mean of each dimension.
+   *
* @since 1.1.0
*/
   override def mean: Vector = {
@@ -168,6 +170,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Sample variance of each dimension.
+   *
* @since 1.1.0
*/
   override def variance: Vector = {
@@ -193,11 +197,15 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Sample size.
+   *
* @since 1.1.0
*/
   override def count: Long = totalCnt
 
   /**
+   * Number of nonzero elements in each dimension.
+   *
* @since 1.1.0
*/
   override def numNonzeros: Vector = {
@@ -207,6 +215,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Maximum value of each dimension.
+   *
* @since 1.1.0
*/
   override def max: Vector = {
@@ -221,6 +231,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Minimum value of each dimension.
+   *
* @since 1.1.0
*/
   override def min: Vector = {
@@ -235,6 +247,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * L2 (Euclidian) norm of each dimension.
+   *
* @since 1.2.0
*/
   override def normL2: Vector = {
@@ -252,6 +266,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * L1 norm of each dimension.
+   *
* @since 1.2.0
*/
   override def normL1: Vector = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods

2015-08-10 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 0f3366a4c - 00b655cce


[SPARK-9755] [MLLIB] Add docs to MultivariateOnlineSummarizer methods

Adds method documentations back to `MultivariateOnlineSummarizer`, which were 
present in 1.4 but disappeared somewhere along the way to 1.5.

jkbradley

Author: Feynman Liang fli...@databricks.com

Closes #8045 from feynmanliang/SPARK-9755 and squashes the following commits:

af67fde [Feynman Liang] Add MultivariateOnlineSummarizer docs


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00b655cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00b655cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00b655cc

Branch: refs/heads/master
Commit: 00b655cced637e1c3b750c19266086b9dcd7c158
Parents: 0f3366a
Author: Feynman Liang fli...@databricks.com
Authored: Mon Aug 10 11:01:45 2015 -0700
Committer: Joseph K. Bradley jos...@databricks.com
Committed: Mon Aug 10 11:01:45 2015 -0700

--
 .../mllib/stat/MultivariateOnlineSummarizer.scala   | 16 
 1 file changed, 16 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/00b655cc/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
index 62da9f2..64e4be0 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
@@ -153,6 +153,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Sample mean of each dimension.
+   *
* @since 1.1.0
*/
   override def mean: Vector = {
@@ -168,6 +170,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Sample variance of each dimension.
+   *
* @since 1.1.0
*/
   override def variance: Vector = {
@@ -193,11 +197,15 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Sample size.
+   *
* @since 1.1.0
*/
   override def count: Long = totalCnt
 
   /**
+   * Number of nonzero elements in each dimension.
+   *
* @since 1.1.0
*/
   override def numNonzeros: Vector = {
@@ -207,6 +215,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Maximum value of each dimension.
+   *
* @since 1.1.0
*/
   override def max: Vector = {
@@ -221,6 +231,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * Minimum value of each dimension.
+   *
* @since 1.1.0
*/
   override def min: Vector = {
@@ -235,6 +247,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * L2 (Euclidian) norm of each dimension.
+   *
* @since 1.2.0
*/
   override def normL2: Vector = {
@@ -252,6 +266,8 @@ class MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with S
   }
 
   /**
+   * L1 norm of each dimension.
+   *
* @since 1.2.0
*/
   override def normL1: Vector = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9710] [TEST] Fix RPackageUtilsSuite when R is not available.

2015-08-10 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master e3fef0f9e - 0f3366a4c


[SPARK-9710] [TEST] Fix RPackageUtilsSuite when R is not available.

RUtils.isRInstalled throws an exception if R is not installed,
instead of returning false. Fix that.

Author: Marcelo Vanzin van...@cloudera.com

Closes #8008 from vanzin/SPARK-9710 and squashes the following commits:

df72d8c [Marcelo Vanzin] [SPARK-9710] [test] Fix RPackageUtilsSuite when R is 
not available.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f3366a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f3366a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f3366a4

Branch: refs/heads/master
Commit: 0f3366a4c740147a7a7519922642912e2dd238f8
Parents: e3fef0f
Author: Marcelo Vanzin van...@cloudera.com
Authored: Mon Aug 10 10:10:40 2015 -0700
Committer: Shivaram Venkataraman shiva...@cs.berkeley.edu
Committed: Mon Aug 10 10:10:40 2015 -0700

--
 core/src/main/scala/org/apache/spark/api/r/RUtils.scala | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0f3366a4/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala 
b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
index 93b3bea..427b2bc 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala
@@ -67,7 +67,11 @@ private[spark] object RUtils {
 
   /** Check if R is installed before running tests that use R commands. */
   def isRInstalled: Boolean = {
-val builder = new ProcessBuilder(Seq(R, --version))
-builder.start().waitFor() == 0
+try {
+  val builder = new ProcessBuilder(Seq(R, --version))
+  builder.start().waitFor() == 0
+} catch {
+  case e: Exception = false
+}
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9743] [SQL] Fixes JSONRelation refreshing

2015-08-10 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 f75c64b0c - 94b2f5b32


[SPARK-9743] [SQL] Fixes JSONRelation refreshing

PR #7696 added two `HadoopFsRelation.refresh()` calls ([this] [1], and [this] 
[2]) in `DataSourceStrategy` to make test case `InsertSuite.save directly to 
the path of a JSON table` pass. However, this forces every `HadoopFsRelation` 
table scan to do a refresh, which can be super expensive for tables with large 
number of partitions.

The reason why the original test case fails without the `refresh()` calls is 
that, the old JSON relation builds the base RDD with the input paths, while 
`HadoopFsRelation` provides `FileStatus`es of leaf files. With the old JSON 
relation, we can create a temporary table based on a path, writing data to 
that, and then read newly written data without refreshing the table. This is no 
long true for `HadoopFsRelation`.

This PR removes those two expensive refresh calls, and moves the refresh into 
`JSONRelation` to fix this issue. We might want to update `HadoopFsRelation` 
interface to provide better support for this use case.

[1]: 
https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L63
[2]: 
https://github.com/apache/spark/blob/ebfd91c542aaead343cb154277fcf9114382fee7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala#L91

Author: Cheng Lian l...@databricks.com

Closes #8035 from liancheng/spark-9743/fix-json-relation-refreshing and 
squashes the following commits:

ec1957d [Cheng Lian] Fixes JSONRelation refreshing

(cherry picked from commit e3fef0f9e17b1766a3869cb80ce7e4cd521cb7b6)
Signed-off-by: Yin Huai yh...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94b2f5b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94b2f5b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94b2f5b3

Branch: refs/heads/branch-1.5
Commit: 94b2f5b3213553278ead376c24e63f019a18e793
Parents: f75c64b
Author: Cheng Lian l...@databricks.com
Authored: Mon Aug 10 09:07:08 2015 -0700
Committer: Yin Huai yh...@databricks.com
Committed: Mon Aug 10 09:07:20 2015 -0700

--
 .../datasources/DataSourceStrategy.scala |  2 --
 .../org/apache/spark/sql/json/JSONRelation.scala | 19 +++
 .../apache/spark/sql/sources/interfaces.scala|  2 +-
 .../apache/spark/sql/sources/InsertSuite.scala   | 10 +-
 4 files changed, 21 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 5b5fa8c..78a4acd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -60,7 +60,6 @@ private[sql] object DataSourceStrategy extends Strategy with 
Logging {
 // Scanning partitioned HadoopFsRelation
 case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation))
 if t.partitionSpec.partitionColumns.nonEmpty =
-  t.refresh()
   val selectedPartitions = prunePartitions(filters, 
t.partitionSpec).toArray
 
   logInfo {
@@ -88,7 +87,6 @@ private[sql] object DataSourceStrategy extends Strategy with 
Logging {
 
 // Scanning non-partitioned HadoopFsRelation
 case PhysicalOperation(projects, filters, l @ LogicalRelation(t: 
HadoopFsRelation)) =
-  t.refresh()
   // See buildPartitionedTableScan for the reason that we need to create a 
shard
   // broadcast HadoopConf.
   val sharedHadoopConf = SparkHadoopUtil.get.conf

http://git-wip-us.apache.org/repos/asf/spark/blob/94b2f5b3/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index b34a272..5bb9e62 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -22,20 +22,22 @@ import java.io.CharArrayWriter
 import com.fasterxml.jackson.core.JsonFactory
 import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.{Text, 

spark git commit: Fixed AtmoicReference Example

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 3ee2c8d16 - 39493b235


Fixed AtmoicReference Example

Author: Mahmoud Lababidi labab...@gmail.com

Closes #8076 from lababidi/master and squashes the following commits:

af4553b [Mahmoud Lababidi] Fixed AtmoicReference Example

(cherry picked from commit d285212756168200383bf4df2c951bd80a492a7c)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39493b23
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39493b23
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39493b23

Branch: refs/heads/branch-1.5
Commit: 39493b235ddaa2123e14629935712c43e4a88c87
Parents: 3ee2c8d
Author: Mahmoud Lababidi labab...@gmail.com
Authored: Mon Aug 10 13:02:01 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 13:02:12 2015 -0700

--
 docs/streaming-kafka-integration.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39493b23/docs/streaming-kafka-integration.md
--
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index 775d508..7571e22 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -152,7 +152,7 @@ Next, we discuss how to use this approach in your streaming 
application.
/div
div data-lang=java markdown=1
// Hold a reference to the current offset ranges, so it can be 
used downstream
-   final AtomicReferenceOffsetRange[] offsetRanges = new 
AtomicReference();
+   final AtomicReferenceOffsetRange[] offsetRanges = new 
AtomicReference();

directKafkaStream.transformToPair(
  new FunctionJavaPairRDDString, String, JavaPairRDDString, 
String() {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[1/2] spark git commit: Preparing Spark release v1.5.0-snapshot-20150810

2015-08-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d17303a94 - e51779c17


Preparing Spark release v1.5.0-snapshot-20150810


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22031493
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22031493
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22031493

Branch: refs/heads/branch-1.5
Commit: 220314935e0aa3c910ac5e22a38e2b6882a5d1e8
Parents: d17303a
Author: Patrick Wendell pwend...@gmail.com
Authored: Mon Aug 10 13:56:50 2015 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Mon Aug 10 13:56:50 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 32 files changed, 32 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index e9c6d26..3ef7d6f 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index ed5c37e..684e07b 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 0e53a79..bb25652 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index e6884b0..9ef1eda 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 1318959..6377c3e 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22031493/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b

spark git commit: [SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe are enabled

2015-08-10 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 39493b235 - d251d9ff0


[SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe 
are enabled

Exchange.isUnsafe should check whether codegen and unsafe are enabled.

Author: Josh Rosen joshro...@databricks.com

Closes #8073 from JoshRosen/SPARK-9784 and squashes the following commits:

7a1019f [Josh Rosen] [SPARK-9784] Exchange.isUnsafe should check whether 
codegen and unsafe are enabled

(cherry picked from commit 0fe66744f16854fc8cd8a72174de93a788e3cf6c)
Signed-off-by: Josh Rosen joshro...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d251d9ff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d251d9ff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d251d9ff

Branch: refs/heads/branch-1.5
Commit: d251d9ff000363665aa6faeb2199a48dc5970ca2
Parents: 39493b2
Author: Josh Rosen joshro...@databricks.com
Authored: Mon Aug 10 13:05:03 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Aug 10 13:05:24 2015 -0700

--
 .../src/main/scala/org/apache/spark/sql/execution/Exchange.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d251d9ff/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index b89e634..029f226 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -46,7 +46,7 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
* Returns true iff we can support the data type, and we are not doing range 
partitioning.
*/
   private lazy val tungstenMode: Boolean = {
-GenerateUnsafeProjection.canSupport(child.schema) 
+unsafeEnabled  codegenEnabled  
GenerateUnsafeProjection.canSupport(child.schema) 
   !newPartitioning.isInstanceOf[RangePartitioning]
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe are enabled

2015-08-10 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master d28521275 - 0fe66744f


[SPARK-9784] [SQL] Exchange.isUnsafe should check whether codegen and unsafe 
are enabled

Exchange.isUnsafe should check whether codegen and unsafe are enabled.

Author: Josh Rosen joshro...@databricks.com

Closes #8073 from JoshRosen/SPARK-9784 and squashes the following commits:

7a1019f [Josh Rosen] [SPARK-9784] Exchange.isUnsafe should check whether 
codegen and unsafe are enabled


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0fe66744
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0fe66744
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0fe66744

Branch: refs/heads/master
Commit: 0fe66744f16854fc8cd8a72174de93a788e3cf6c
Parents: d285212
Author: Josh Rosen joshro...@databricks.com
Authored: Mon Aug 10 13:05:03 2015 -0700
Committer: Josh Rosen joshro...@databricks.com
Committed: Mon Aug 10 13:05:03 2015 -0700

--
 .../src/main/scala/org/apache/spark/sql/execution/Exchange.scala   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0fe66744/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index b89e634..029f226 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -46,7 +46,7 @@ case class Exchange(newPartitioning: Partitioning, child: 
SparkPlan) extends Una
* Returns true iff we can support the data type, and we are not doing range 
partitioning.
*/
   private lazy val tungstenMode: Boolean = {
-GenerateUnsafeProjection.canSupport(child.schema) 
+unsafeEnabled  codegenEnabled  
GenerateUnsafeProjection.canSupport(child.schema) 
   !newPartitioning.isInstanceOf[RangePartitioning]
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9633] [BUILD] SBT download locations outdated; need an update

2015-08-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 1c38d4254 - e23843c06


[SPARK-9633] [BUILD] SBT download locations outdated; need an update

Remove 2 defunct SBT download URLs and replace with the 1 known download URL. 
Also, use https.
Follow up on https://github.com/apache/spark/pull/7792

Author: Sean Owen so...@cloudera.com

Closes #7956 from srowen/SPARK-9633 and squashes the following commits:

caa40bd [Sean Owen] Remove 2 defunct SBT download URLs and replace with the 1 
known download URL. Also, use https.

Conflicts:
sbt/sbt-launch-lib.bash


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e23843c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e23843c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e23843c0

Branch: refs/heads/branch-1.2
Commit: e23843c0603bb7d278ba21abde0dd92661b2fac8
Parents: 1c38d42
Author: Sean Owen so...@cloudera.com
Authored: Thu Aug 6 23:43:52 2015 +0100
Committer: Davies Liu davies@gmail.com
Committed: Mon Aug 10 13:10:40 2015 -0700

--
 sbt/sbt-launch-lib.bash | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e23843c0/sbt/sbt-launch-lib.bash
--
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index 055e206..eb6b415 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -38,8 +38,7 @@ dlog () {
 
 acquire_sbt_jar () {
   SBT_VERSION=`awk -F = '/sbt\\.version/ {print $2}' 
./project/build.properties`
-  
URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
-  
URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+  
URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
   JAR=sbt/sbt-launch-${SBT_VERSION}.jar
 
   sbt_jar=$JAR
@@ -51,9 +50,9 @@ acquire_sbt_jar () {
 printf Attempting to fetch sbt\n
 JAR_DL=${JAR}.part
 if hash curl 2/dev/null; then
-  (curl --silent ${URL1}  ${JAR_DL} || curl --silent ${URL2}  
${JAR_DL})  mv ${JAR_DL} ${JAR}
+  curl --fail --location --silent ${URL1}  ${JAR_DL}  mv ${JAR_DL} 
${JAR}
 elif hash wget 2/dev/null; then
-  (wget --quiet ${URL1} -O ${JAR_DL} || wget --quiet ${URL2} -O 
${JAR_DL})  mv ${JAR_DL} ${JAR}
+  wget --quiet ${URL1} -O ${JAR_DL}  mv ${JAR_DL} ${JAR}
 else
   printf You do not have curl or wget installed, please install sbt 
manually from http://www.scala-sbt.org/\n;
   exit -1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9620] [SQL] generated UnsafeProjection should support many columns or large exressions

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 c1838e430 - 23842482f


[SPARK-9620] [SQL] generated UnsafeProjection should support many columns or 
large exressions

Currently, generated UnsafeProjection can reach 64k byte code limit of Java. 
This patch will split the generated expressions into multiple functions, to 
avoid the limitation.

After this patch, we can work well with table that have up to 64k columns (hit 
max number of constants limit in Java), it should be enough in practice.

cc rxin

Author: Davies Liu dav...@databricks.com

Closes #8044 from davies/wider_table and squashes the following commits:

9192e6c [Davies Liu] fix generated safe projection
d1ef81a [Davies Liu] fix failed tests
737b3d3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
wider_table
ffcd132 [Davies Liu] address comments
1b95be4 [Davies Liu] put the generated class into sql package
77ed72d [Davies Liu] address comments
4518e17 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
wider_table
75ccd01 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
wider_table
495e932 [Davies Liu] support wider table with more than 1k columns for 
generated projections

(cherry picked from commit fe2fb7fb7189d183a4273ad27514af4b6b461f26)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23842482
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23842482
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23842482

Branch: refs/heads/branch-1.5
Commit: 23842482f46b45d90ba32ce406675cdb5f88c537
Parents: c1838e4
Author: Davies Liu dav...@databricks.com
Authored: Mon Aug 10 13:52:18 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 13:52:27 2015 -0700

--
 .../expressions/codegen/CodeGenerator.scala |  48 +++-
 .../codegen/GenerateMutableProjection.scala |  43 +--
 .../codegen/GenerateSafeProjection.scala|  52 ++--
 .../codegen/GenerateUnsafeProjection.scala  | 122 ++-
 .../codegen/GenerateUnsafeRowJoiner.scala   |   2 +-
 .../codegen/GeneratedProjectionSuite.scala  |  82 +
 6 files changed, 207 insertions(+), 142 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/23842482/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 7b41c9a..c21f4d6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions.codegen
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.language.existentials
 
 import com.google.common.cache.{CacheBuilder, CacheLoader}
@@ -265,6 +266,45 @@ class CodeGenContext {
   def isPrimitiveType(jt: String): Boolean = primitiveTypes.contains(jt)
 
   def isPrimitiveType(dt: DataType): Boolean = isPrimitiveType(javaType(dt))
+
+  /**
+   * Splits the generated code of expressions into multiple functions, because 
function has
+   * 64kb code size limit in JVM
+   *
+   * @param row the variable name of row that is used by expressions
+   */
+  def splitExpressions(row: String, expressions: Seq[String]): String = {
+val blocks = new ArrayBuffer[String]()
+val blockBuilder = new StringBuilder()
+for (code - expressions) {
+  // We can't know how many byte code will be generated, so use the number 
of bytes as limit
+  if (blockBuilder.length  64 * 1000) {
+blocks.append(blockBuilder.toString())
+blockBuilder.clear()
+  }
+  blockBuilder.append(code)
+}
+blocks.append(blockBuilder.toString())
+
+if (blocks.length == 1) {
+  // inline execution if only one block
+  blocks.head
+} else {
+  val apply = freshName(apply)
+  val functions = blocks.zipWithIndex.map { case (body, i) =
+val name = s${apply}_$i
+val code = s
+   |private void $name(InternalRow $row) {
+   |  $body
+   |}
+ .stripMargin
+ addNewFunction(name, code)
+ name
+  }
+
+  functions.map(name = s$name($row);).mkString(\n)
+}
+  }
 }
 
 /**
@@ -289,15 +329,15 @@ abstract class CodeGenerator[InType : AnyRef, OutType : 
AnyRef] extends Loggin
   protected def declareMutableStates(ctx: 

spark git commit: [SPARK-9620] [SQL] generated UnsafeProjection should support many columns or large exressions

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 40ed2af58 - fe2fb7fb7


[SPARK-9620] [SQL] generated UnsafeProjection should support many columns or 
large exressions

Currently, generated UnsafeProjection can reach 64k byte code limit of Java. 
This patch will split the generated expressions into multiple functions, to 
avoid the limitation.

After this patch, we can work well with table that have up to 64k columns (hit 
max number of constants limit in Java), it should be enough in practice.

cc rxin

Author: Davies Liu dav...@databricks.com

Closes #8044 from davies/wider_table and squashes the following commits:

9192e6c [Davies Liu] fix generated safe projection
d1ef81a [Davies Liu] fix failed tests
737b3d3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
wider_table
ffcd132 [Davies Liu] address comments
1b95be4 [Davies Liu] put the generated class into sql package
77ed72d [Davies Liu] address comments
4518e17 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
wider_table
75ccd01 [Davies Liu] Merge branch 'master' of github.com:apache/spark into 
wider_table
495e932 [Davies Liu] support wider table with more than 1k columns for 
generated projections


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe2fb7fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe2fb7fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe2fb7fb

Branch: refs/heads/master
Commit: fe2fb7fb7189d183a4273ad27514af4b6b461f26
Parents: 40ed2af
Author: Davies Liu dav...@databricks.com
Authored: Mon Aug 10 13:52:18 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 13:52:18 2015 -0700

--
 .../expressions/codegen/CodeGenerator.scala |  48 +++-
 .../codegen/GenerateMutableProjection.scala |  43 +--
 .../codegen/GenerateSafeProjection.scala|  52 ++--
 .../codegen/GenerateUnsafeProjection.scala  | 122 ++-
 .../codegen/GenerateUnsafeRowJoiner.scala   |   2 +-
 .../codegen/GeneratedProjectionSuite.scala  |  82 +
 6 files changed, 207 insertions(+), 142 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fe2fb7fb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 7b41c9a..c21f4d6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.catalyst.expressions.codegen
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 import scala.language.existentials
 
 import com.google.common.cache.{CacheBuilder, CacheLoader}
@@ -265,6 +266,45 @@ class CodeGenContext {
   def isPrimitiveType(jt: String): Boolean = primitiveTypes.contains(jt)
 
   def isPrimitiveType(dt: DataType): Boolean = isPrimitiveType(javaType(dt))
+
+  /**
+   * Splits the generated code of expressions into multiple functions, because 
function has
+   * 64kb code size limit in JVM
+   *
+   * @param row the variable name of row that is used by expressions
+   */
+  def splitExpressions(row: String, expressions: Seq[String]): String = {
+val blocks = new ArrayBuffer[String]()
+val blockBuilder = new StringBuilder()
+for (code - expressions) {
+  // We can't know how many byte code will be generated, so use the number 
of bytes as limit
+  if (blockBuilder.length  64 * 1000) {
+blocks.append(blockBuilder.toString())
+blockBuilder.clear()
+  }
+  blockBuilder.append(code)
+}
+blocks.append(blockBuilder.toString())
+
+if (blocks.length == 1) {
+  // inline execution if only one block
+  blocks.head
+} else {
+  val apply = freshName(apply)
+  val functions = blocks.zipWithIndex.map { case (body, i) =
+val name = s${apply}_$i
+val code = s
+   |private void $name(InternalRow $row) {
+   |  $body
+   |}
+ .stripMargin
+ addNewFunction(name, code)
+ name
+  }
+
+  functions.map(name = s$name($row);).mkString(\n)
+}
+  }
 }
 
 /**
@@ -289,15 +329,15 @@ abstract class CodeGenerator[InType : AnyRef, OutType : 
AnyRef] extends Loggin
   protected def declareMutableStates(ctx: CodeGenContext): String = {
 ctx.mutableStates.map { case (javaType, variableName, _) =
   sprivate $javaType $variableName;
-

spark git commit: [SPARK-9633] [BUILD] SBT download locations outdated; need an update

2015-08-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 d304ef4a9 - 117843f85


[SPARK-9633] [BUILD] SBT download locations outdated; need an update

Remove 2 defunct SBT download URLs and replace with the 1 known download URL. 
Also, use https.
Follow up on https://github.com/apache/spark/pull/7792

Author: Sean Owen so...@cloudera.com

Closes #7956 from srowen/SPARK-9633 and squashes the following commits:

caa40bd [Sean Owen] Remove 2 defunct SBT download URLs and replace with the 1 
known download URL. Also, use https.

Conflicts:
sbt/sbt-launch-lib.bash


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/117843f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/117843f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/117843f8

Branch: refs/heads/branch-1.0
Commit: 117843f85e8e69a43ffa531716a37c8c05dbabb0
Parents: d304ef4
Author: Sean Owen so...@cloudera.com
Authored: Thu Aug 6 23:43:52 2015 +0100
Committer: Davies Liu davies@gmail.com
Committed: Mon Aug 10 13:15:41 2015 -0700

--
 sbt/sbt-launch-lib.bash | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/117843f8/sbt/sbt-launch-lib.bash
--
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index 64e40a8..2fbb060 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -37,8 +37,7 @@ dlog () {
 
 acquire_sbt_jar () {
   SBT_VERSION=`awk -F = '/sbt\\.version/ {print $2}' 
./project/build.properties`
-  
URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
-  
URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+  
URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
   JAR=sbt/sbt-launch-${SBT_VERSION}.jar
 
   sbt_jar=$JAR
@@ -50,9 +49,9 @@ acquire_sbt_jar () {
 printf Attempting to fetch sbt\n
 JAR_DL=${JAR}.part
 if hash curl 2/dev/null; then
-  (curl --progress-bar ${URL1}  ${JAR_DL} || curl --progress-bar ${URL2} 
 ${JAR_DL})  mv ${JAR_DL} ${JAR}
+  curl --progress-bar --fail --location ${URL1}  ${JAR_DL}  mv 
${JAR_DL} ${JAR}
 elif hash wget 2/dev/null; then
-  (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} 
-O ${JAR_DL})  mv ${JAR_DL} ${JAR}
+  wget --progress=bar ${URL1} -O ${JAR_DL}  mv ${JAR_DL} ${JAR}
 else
   printf You do not have curl or wget installed, please install sbt 
manually from http://www.scala-sbt.org/\n;
   exit -1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType)

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master fe2fb7fb7 - c4fd2a242


[SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType)

This patch optimize two things:

1. passing MathContext to JavaBigDecimal.multiply/divide/reminder to do right 
rounding, because java.math.BigDecimal.apply(MathContext) is expensive

2. Cast integer/short/byte to decimal directly (without double)

This two optimizations could speed up the end-to-end time of a aggregation 
(SUM(short * decimal(5, 2)) 75% (from 19s - 10.8s)

Author: Davies Liu dav...@databricks.com

Closes #8052 from davies/optimize_decimal and squashes the following commits:

225efad [Davies Liu] improve decimal.times() and cast(int, decimalType)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4fd2a24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4fd2a24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4fd2a24

Branch: refs/heads/master
Commit: c4fd2a242228ee101904770446e3f37d49e39b76
Parents: fe2fb7f
Author: Davies Liu dav...@databricks.com
Authored: Mon Aug 10 13:55:11 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 13:55:11 2015 -0700

--
 .../spark/sql/catalyst/expressions/Cast.scala   | 42 +++-
 .../org/apache/spark/sql/types/Decimal.scala| 12 +++---
 2 files changed, 22 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c4fd2a24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 946c5a9..616b9e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -155,7 +155,7 @@ case class Cast(child: Expression, dataType: DataType)
 case ByteType =
   buildCast[Byte](_, _ != 0)
 case DecimalType() =
-  buildCast[Decimal](_, _ != Decimal.ZERO)
+  buildCast[Decimal](_, !_.isZero)
 case DoubleType =
   buildCast[Double](_, _ != 0)
 case FloatType =
@@ -315,13 +315,13 @@ case class Cast(child: Expression, dataType: DataType)
 case TimestampType =
   // Note that we lose precision here.
   buildCast[Long](_, t = changePrecision(Decimal(timestampToDouble(t)), 
target))
-case DecimalType() =
+case dt: DecimalType =
   b = changePrecision(b.asInstanceOf[Decimal].clone(), target)
-case LongType =
-  b = changePrecision(Decimal(b.asInstanceOf[Long]), target)
-case x: NumericType = // All other numeric types can be represented 
precisely as Doubles
+case t: IntegralType =
+  b = 
changePrecision(Decimal(t.integral.asInstanceOf[Integral[Any]].toLong(b)), 
target)
+case x: FractionalType =
   b = try {
-
changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), 
target)
+
changePrecision(Decimal(x.fractional.asInstanceOf[Fractional[Any]].toDouble(b)),
 target)
   } catch {
 case _: NumberFormatException = null
   }
@@ -534,10 +534,7 @@ case class Cast(child: Expression, dataType: DataType)
 (c, evPrim, evNull) =
   s
 try {
-  org.apache.spark.sql.types.Decimal tmpDecimal =
-new org.apache.spark.sql.types.Decimal().set(
-  new scala.math.BigDecimal(
-new java.math.BigDecimal($c.toString(;
+  Decimal tmpDecimal = Decimal.apply(new 
java.math.BigDecimal($c.toString()));
   ${changePrecision(tmpDecimal, target, evPrim, evNull)}
 } catch (java.lang.NumberFormatException e) {
   $evNull = true;
@@ -546,12 +543,7 @@ case class Cast(child: Expression, dataType: DataType)
   case BooleanType =
 (c, evPrim, evNull) =
   s
-org.apache.spark.sql.types.Decimal tmpDecimal = null;
-if ($c) {
-  tmpDecimal = new org.apache.spark.sql.types.Decimal().set(1);
-} else {
-  tmpDecimal = new org.apache.spark.sql.types.Decimal().set(0);
-}
+Decimal tmpDecimal = $c ? Decimal.apply(1) : Decimal.apply(0);
 ${changePrecision(tmpDecimal, target, evPrim, evNull)}
   
   case DateType =
@@ -561,32 +553,28 @@ case class Cast(child: Expression, dataType: DataType)
 // Note that we lose precision here.
 (c, evPrim, evNull) =
   s
-org.apache.spark.sql.types.Decimal tmpDecimal =
-  new org.apache.spark.sql.types.Decimal().set(
-

spark git commit: [SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType)

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 23842482f - d17303a94


[SPARK-9759] [SQL] improve decimal.times() and cast(int, decimalType)

This patch optimize two things:

1. passing MathContext to JavaBigDecimal.multiply/divide/reminder to do right 
rounding, because java.math.BigDecimal.apply(MathContext) is expensive

2. Cast integer/short/byte to decimal directly (without double)

This two optimizations could speed up the end-to-end time of a aggregation 
(SUM(short * decimal(5, 2)) 75% (from 19s - 10.8s)

Author: Davies Liu dav...@databricks.com

Closes #8052 from davies/optimize_decimal and squashes the following commits:

225efad [Davies Liu] improve decimal.times() and cast(int, decimalType)

(cherry picked from commit c4fd2a242228ee101904770446e3f37d49e39b76)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d17303a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d17303a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d17303a9

Branch: refs/heads/branch-1.5
Commit: d17303a94820ded970030968006ecabe76820278
Parents: 2384248
Author: Davies Liu dav...@databricks.com
Authored: Mon Aug 10 13:55:11 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 13:55:19 2015 -0700

--
 .../spark/sql/catalyst/expressions/Cast.scala   | 42 +++-
 .../org/apache/spark/sql/types/Decimal.scala| 12 +++---
 2 files changed, 22 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d17303a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 946c5a9..616b9e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -155,7 +155,7 @@ case class Cast(child: Expression, dataType: DataType)
 case ByteType =
   buildCast[Byte](_, _ != 0)
 case DecimalType() =
-  buildCast[Decimal](_, _ != Decimal.ZERO)
+  buildCast[Decimal](_, !_.isZero)
 case DoubleType =
   buildCast[Double](_, _ != 0)
 case FloatType =
@@ -315,13 +315,13 @@ case class Cast(child: Expression, dataType: DataType)
 case TimestampType =
   // Note that we lose precision here.
   buildCast[Long](_, t = changePrecision(Decimal(timestampToDouble(t)), 
target))
-case DecimalType() =
+case dt: DecimalType =
   b = changePrecision(b.asInstanceOf[Decimal].clone(), target)
-case LongType =
-  b = changePrecision(Decimal(b.asInstanceOf[Long]), target)
-case x: NumericType = // All other numeric types can be represented 
precisely as Doubles
+case t: IntegralType =
+  b = 
changePrecision(Decimal(t.integral.asInstanceOf[Integral[Any]].toLong(b)), 
target)
+case x: FractionalType =
   b = try {
-
changePrecision(Decimal(x.numeric.asInstanceOf[Numeric[Any]].toDouble(b)), 
target)
+
changePrecision(Decimal(x.fractional.asInstanceOf[Fractional[Any]].toDouble(b)),
 target)
   } catch {
 case _: NumberFormatException = null
   }
@@ -534,10 +534,7 @@ case class Cast(child: Expression, dataType: DataType)
 (c, evPrim, evNull) =
   s
 try {
-  org.apache.spark.sql.types.Decimal tmpDecimal =
-new org.apache.spark.sql.types.Decimal().set(
-  new scala.math.BigDecimal(
-new java.math.BigDecimal($c.toString(;
+  Decimal tmpDecimal = Decimal.apply(new 
java.math.BigDecimal($c.toString()));
   ${changePrecision(tmpDecimal, target, evPrim, evNull)}
 } catch (java.lang.NumberFormatException e) {
   $evNull = true;
@@ -546,12 +543,7 @@ case class Cast(child: Expression, dataType: DataType)
   case BooleanType =
 (c, evPrim, evNull) =
   s
-org.apache.spark.sql.types.Decimal tmpDecimal = null;
-if ($c) {
-  tmpDecimal = new org.apache.spark.sql.types.Decimal().set(1);
-} else {
-  tmpDecimal = new org.apache.spark.sql.types.Decimal().set(0);
-}
+Decimal tmpDecimal = $c ? Decimal.apply(1) : Decimal.apply(0);
 ${changePrecision(tmpDecimal, target, evPrim, evNull)}
   
   case DateType =
@@ -561,32 +553,28 @@ case class Cast(child: Expression, dataType: DataType)
 // Note that we lose precision here.
 (c, evPrim, evNull) =
   s
- 

[05/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
new file mode 100644
index 000..6b62c9a
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+trait TestJsonData {
+
+  protected def ctx: SQLContext
+
+  def primitiveFieldAndType: RDD[String] =
+ctx.sparkContext.parallelize(
+  {string:this is a simple string.,
+  integer:10,
+  long:21474836470,
+  bigInteger:92233720368547758070,
+  double:1.7976931348623157E308,
+  boolean:true,
+  null:null
+  }  :: Nil)
+
+  def primitiveFieldValueTypeConflict: RDD[String] =
+ctx.sparkContext.parallelize(
+  {num_num_1:11, num_num_2:null, num_num_3: 1.1,
+  num_bool:true, num_str:13.1, str_bool:str1} ::
+  {num_num_1:null, num_num_2:21474836470.9, num_num_3: null,
+  num_bool:12, num_str:null, str_bool:true} ::
+  {num_num_1:21474836470, num_num_2:92233720368547758070, 
num_num_3: 100,
+  num_bool:false, num_str:str1, str_bool:false} ::
+  {num_num_1:21474836570, num_num_2:1.1, num_num_3: 21474836470,
+  num_bool:null, num_str:92233720368547758070, str_bool:null} 
:: Nil)
+
+  def jsonNullStruct: RDD[String] =
+ctx.sparkContext.parallelize(
+  
{nullstr:,ip:27.31.100.29,headers:{Host:1.abc.com,Charset:UTF-8}}
 ::
+{nullstr:,ip:27.31.100.29,headers:{}} ::
+{nullstr:,ip:27.31.100.29,headers:} ::
+{nullstr:null,ip:27.31.100.29,headers:null} :: Nil)
+
+  def complexFieldValueTypeConflict: RDD[String] =
+ctx.sparkContext.parallelize(
+  {num_struct:11, str_array:[1, 2, 3],
+  array:[], struct_array:[], struct: {}} ::
+  {num_struct:{field:false}, str_array:null,
+  array:null, struct_array:{}, struct: null} ::
+  {num_struct:null, str_array:str,
+  array:[4, 5, 6], struct_array:[7, 8, 9], struct: 
{field:null}} ::
+  {num_struct:{}, str_array:[str1, str2, 33],
+  array:[7], struct_array:{field: true}, struct: {field: 
str}} :: Nil)
+
+  def arrayElementTypeConflict: RDD[String] =
+ctx.sparkContext.parallelize(
+  {array1: [1, 1.1, true, null, [], {}, [2,3,4], {field:str}],
+  array2: [{field:214748364700}, {field:1}]} ::
+  {array3: [{field:str}, {field:1}]} ::
+  {array3: [1, 2, 3]} :: Nil)
+
+  def missingFields: RDD[String] =
+ctx.sparkContext.parallelize(
+  {a:true} ::
+  {b:21474836470} ::
+  {c:[33, 44]} ::
+  {d:{field:true}} ::
+  {e:str} :: Nil)
+
+  def complexFieldAndType1: RDD[String] =
+ctx.sparkContext.parallelize(
+  {struct:{field1: true, field2: 92233720368547758070},
+  structWithArrayFields:{field1:[4, 5, 6], field2:[str1, 
str2]},
+  arrayOfString:[str1, str2],
+  arrayOfInteger:[1, 2147483647, -2147483648],
+  arrayOfLong:[21474836470, 9223372036854775807, 
-9223372036854775808],
+  arrayOfBigInteger:[922337203685477580700, -922337203685477580800],
+  arrayOfDouble:[1.2, 1.7976931348623157E308, 4.9E-324, 
2.2250738585072014E-308],
+  arrayOfBoolean:[true, false, true],
+  arrayOfNull:[null, null, null, null],
+  arrayOfStruct:[{field1: true, field2: str1}, {field1: 
false}, {field3: null}],
+  arrayOfArray1:[[1, 2, 3], [str1, str2]],
+  arrayOfArray2:[[1, 2, 3], [1.1, 2.1, 3.1]]
+ }  :: Nil)
+
+  def complexFieldAndType2: RDD[String] =
+ctx.sparkContext.parallelize(
+  {arrayOfStruct:[{field1: true, field2: str1}, {field1: 
false}, {field3: null}],
+  complexArrayOfStruct: [
+  {
+field1: [
+

[2/2] spark git commit: Preparing development version 1.5.0-SNAPSHOT

2015-08-10 Thread pwendell
Preparing development version 1.5.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e51779c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e51779c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e51779c1

Branch: refs/heads/branch-1.5
Commit: e51779c17877a5c715ff26d6b174c6535fb20a4c
Parents: 2203149
Author: Patrick Wendell pwend...@gmail.com
Authored: Mon Aug 10 13:56:56 2015 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Mon Aug 10 13:56:56 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 32 files changed, 32 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 3ef7d6f..e9c6d26 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 684e07b..ed5c37e 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index bb25652..0e53a79 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 9ef1eda..e6884b0 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 6377c3e..1318959 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e51779c1/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 7d72f78..0664cfb 100644
--- 

Git Push Summary

2015-08-10 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.5.0-snapshot-20150810 [created] 220314935

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



Git Push Summary

2015-08-10 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.5.0-snapshot-20150803 [deleted] 7e7147f3b

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[2/2] spark git commit: Preparing development version 1.5.0-SNAPSHOT

2015-08-10 Thread pwendell
Preparing development version 1.5.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e4f58ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e4f58ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e4f58ed

Branch: refs/heads/branch-1.5
Commit: 0e4f58ed9b63b7eade96b8e4b2c087b182915b40
Parents: 3369ad9
Author: Patrick Wendell pwend...@gmail.com
Authored: Mon Aug 10 14:26:56 2015 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Mon Aug 10 14:26:56 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 32 files changed, 32 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 3ef7d6f..e9c6d26 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 684e07b..ed5c37e 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index bb25652..0e53a79 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 9ef1eda..e6884b0 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 6377c3e..1318959 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0/version
+version1.5.0-SNAPSHOT/version
 relativePath../../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e4f58ed/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b/external/flume-sink/pom.xml
index 7d72f78..0664cfb 100644
--- 

[1/2] spark git commit: Preparing Spark release v1.5.0-snapshot-20150810

2015-08-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e51779c17 - 0e4f58ed9


Preparing Spark release v1.5.0-snapshot-20150810


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3369ad9b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3369ad9b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3369ad9b

Branch: refs/heads/branch-1.5
Commit: 3369ad9bcba73737b5b055a4e76b18f2a4da2c5d
Parents: e51779c
Author: Patrick Wendell pwend...@gmail.com
Authored: Mon Aug 10 14:26:49 2015 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Mon Aug 10 14:26:49 2015 -0700

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 32 files changed, 32 insertions(+), 32 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index e9c6d26..3ef7d6f 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index ed5c37e..684e07b 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 0e53a79..bb25652 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index e6884b0..9ef1eda 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 1318959..6377c3e 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   parent
 groupIdorg.apache.spark/groupId
 artifactIdspark-parent_2.10/artifactId
-version1.5.0-SNAPSHOT/version
+version1.5.0/version
 relativePath../../pom.xml/relativePath
   /parent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3369ad9b/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml b

Git Push Summary

2015-08-10 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.5.0-snapshot-20150810 [created] 3369ad9bc

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9737] [YARN] Add the suggested configuration when required executor memory is above the max threshold of this cluster on YARN mode

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 0e4f58ed9 - 51406becc


[SPARK-9737] [YARN] Add the suggested configuration when required executor 
memory is above the max threshold of this cluster on YARN mode

Author: Yadong Qi qiyadong2...@gmail.com

Closes #8028 from watermen/SPARK-9737 and squashes the following commits:

48bdf3d [Yadong Qi] Add suggested configuration.

(cherry picked from commit 86fa4ba6d13f909cb508b7cb3b153d586fe59bc3)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51406bec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51406bec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51406bec

Branch: refs/heads/branch-1.5
Commit: 51406becc78857dbc36c3330f3afe8dd5926dc2f
Parents: 0e4f58e
Author: Yadong Qi qiyadong2...@gmail.com
Authored: Sun Aug 9 19:54:05 2015 +0100
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 15:40:01 2015 -0700

--
 yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51406bec/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
--
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index fc11bbf..b4ba3f0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -203,12 +203,14 @@ private[spark] class Client(
 val executorMem = args.executorMemory + executorMemoryOverhead
 if (executorMem  maxMem) {
   throw new IllegalArgumentException(sRequired executor memory 
(${args.executorMemory} +
-s+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) 
of this cluster!)
+s+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) 
of this cluster!  +
+Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.)
 }
 val amMem = args.amMemory + amMemoryOverhead
 if (amMem  maxMem) {
   throw new IllegalArgumentException(sRequired AM memory 
(${args.amMemory} +
-s+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of 
this cluster!)
+s+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of 
this cluster!  +
+Please increase the value of 'yarn.scheduler.maximum-allocation-mb'.)
 }
 logInfo(Will allocate AM container, with %d MB memory including %d MB 
overhead.format(
   amMem,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python

2015-08-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master c4fd2a242 - 853809e94


[SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python

This PR is based on #4229, thanks prabeesh.

Closes #4229

Author: Prabeesh K prabsma...@gmail.com
Author: zsxwing zsxw...@gmail.com
Author: prabs prabsma...@gmail.com
Author: Prabeesh K prabees...@namshi.com

Closes #7833 from zsxwing/pr4229 and squashes the following commits:

9570bec [zsxwing] Fix the variable name and check null in finally
4a9c79e [zsxwing] Fix pom.xml indentation
abf5f18 [zsxwing] Merge branch 'master' into pr4229
935615c [zsxwing] Fix the flaky MQTT tests
47278c5 [zsxwing] Include the project class files
478f844 [zsxwing] Add unpack
5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT 
tests
734db99 [zsxwing] Merge branch 'master' into pr4229
126608a [Prabeesh K] address the comments
b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229
d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; 
Revert unncessary changes; fix the python unit test
a6747cb [Prabeesh K] wait for starting the receiver before publishing data
87fc677 [Prabeesh K] address the comments:
97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt
80474d1 [Prabeesh K] fix
1f0cfe9 [Prabeesh K] python style fix
e1ee016 [Prabeesh K] scala style fix
a5a8f9f [Prabeesh K] added Python test
9767d82 [Prabeesh K] implemented Python-friendly class
a11968b [Prabeesh K] fixed python style
795ec27 [Prabeesh K] address comments
ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly
3f4df12 [Prabeesh K] updated version
b34c3c1 [prabs] adress comments
3aa7fff [prabs] Added Python streaming mqtt word count example
b7d42ff [prabs] Mqtt streaming support in Python


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/853809e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/853809e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/853809e9

Branch: refs/heads/master
Commit: 853809e948e7c5092643587a30738115b6591a59
Parents: c4fd2a2
Author: Prabeesh K prabsma...@gmail.com
Authored: Mon Aug 10 16:33:23 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 10 16:33:23 2015 -0700

--
 dev/run-tests.py|   2 +
 dev/sparktestsupport/modules.py |   2 +
 docs/streaming-programming-guide.md |   2 +-
 .../src/main/python/streaming/mqtt_wordcount.py |  58 +
 external/mqtt-assembly/pom.xml  | 102 
 external/mqtt/pom.xml   |  28 +
 external/mqtt/src/main/assembly/assembly.xml|  44 +++
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  16 +++
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  | 118 +++
 .../spark/streaming/mqtt/MQTTTestUtils.scala| 111 +
 pom.xml |   1 +
 project/SparkBuild.scala|  12 +-
 python/pyspark/streaming/mqtt.py|  72 +++
 python/pyspark/streaming/tests.py   | 106 -
 14 files changed, 565 insertions(+), 109 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/dev/run-tests.py
--
diff --git a/dev/run-tests.py b/dev/run-tests.py
index d1852b9..f689425 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -303,6 +303,8 @@ def build_spark_sbt(hadoop_version):
  assembly/assembly,
  streaming-kafka-assembly/assembly,
  streaming-flume-assembly/assembly,
+ streaming-mqtt-assembly/assembly,
+ streaming-mqtt/test:assembly,
  streaming-kinesis-asl-assembly/assembly]
 profiles_and_goals = build_profiles + sbt_goals
 

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/dev/sparktestsupport/modules.py
--
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index a9717ff..d82c0cc 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -181,6 +181,7 @@ streaming_mqtt = Module(
 dependencies=[streaming],
 source_file_regexes=[
 external/mqtt,
+external/mqtt-assembly,
 ],
 sbt_test_goals=[
 streaming-mqtt/test,
@@ -306,6 +307,7 @@ pyspark_streaming = Module(
 streaming,
 streaming_kafka,
 streaming_flume_assembly,
+streaming_mqtt,
 streaming_kinesis_asl
 ],
 source_file_regexes=[

http://git-wip-us.apache.org/repos/asf/spark/blob/853809e9/docs/streaming-programming-guide.md

spark git commit: [SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python

2015-08-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 51406becc - 8f4014fda


[SPARK-5155] [PYSPARK] [STREAMING] Mqtt streaming support in Python

This PR is based on #4229, thanks prabeesh.

Closes #4229

Author: Prabeesh K prabsma...@gmail.com
Author: zsxwing zsxw...@gmail.com
Author: prabs prabsma...@gmail.com
Author: Prabeesh K prabees...@namshi.com

Closes #7833 from zsxwing/pr4229 and squashes the following commits:

9570bec [zsxwing] Fix the variable name and check null in finally
4a9c79e [zsxwing] Fix pom.xml indentation
abf5f18 [zsxwing] Merge branch 'master' into pr4229
935615c [zsxwing] Fix the flaky MQTT tests
47278c5 [zsxwing] Include the project class files
478f844 [zsxwing] Add unpack
5f8a1d4 [zsxwing] Make the maven build generate the test jar for Python MQTT 
tests
734db99 [zsxwing] Merge branch 'master' into pr4229
126608a [Prabeesh K] address the comments
b90b709 [Prabeesh K] Merge pull request #1 from zsxwing/pr4229
d07f454 [zsxwing] Register StreamingListerner before starting StreamingContext; 
Revert unncessary changes; fix the python unit test
a6747cb [Prabeesh K] wait for starting the receiver before publishing data
87fc677 [Prabeesh K] address the comments:
97244ec [zsxwing] Make sbt build the assembly test jar for streaming mqtt
80474d1 [Prabeesh K] fix
1f0cfe9 [Prabeesh K] python style fix
e1ee016 [Prabeesh K] scala style fix
a5a8f9f [Prabeesh K] added Python test
9767d82 [Prabeesh K] implemented Python-friendly class
a11968b [Prabeesh K] fixed python style
795ec27 [Prabeesh K] address comments
ee387ae [Prabeesh K] Fix assembly jar location of mqtt-assembly
3f4df12 [Prabeesh K] updated version
b34c3c1 [prabs] adress comments
3aa7fff [prabs] Added Python streaming mqtt word count example
b7d42ff [prabs] Mqtt streaming support in Python

(cherry picked from commit 853809e948e7c5092643587a30738115b6591a59)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f4014fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f4014fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f4014fd

Branch: refs/heads/branch-1.5
Commit: 8f4014fdaf22dd8a3bd4728987c76c11d79e07d9
Parents: 51406be
Author: Prabeesh K prabsma...@gmail.com
Authored: Mon Aug 10 16:33:23 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 10 16:33:34 2015 -0700

--
 dev/run-tests.py|   2 +
 dev/sparktestsupport/modules.py |   2 +
 docs/streaming-programming-guide.md |   2 +-
 .../src/main/python/streaming/mqtt_wordcount.py |  58 +
 external/mqtt-assembly/pom.xml  | 102 
 external/mqtt/pom.xml   |  28 +
 external/mqtt/src/main/assembly/assembly.xml|  44 +++
 .../apache/spark/streaming/mqtt/MQTTUtils.scala |  16 +++
 .../spark/streaming/mqtt/MQTTStreamSuite.scala  | 118 +++
 .../spark/streaming/mqtt/MQTTTestUtils.scala| 111 +
 pom.xml |   1 +
 project/SparkBuild.scala|  12 +-
 python/pyspark/streaming/mqtt.py|  72 +++
 python/pyspark/streaming/tests.py   | 106 -
 14 files changed, 565 insertions(+), 109 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8f4014fd/dev/run-tests.py
--
diff --git a/dev/run-tests.py b/dev/run-tests.py
index d1852b9..f689425 100755
--- a/dev/run-tests.py
+++ b/dev/run-tests.py
@@ -303,6 +303,8 @@ def build_spark_sbt(hadoop_version):
  assembly/assembly,
  streaming-kafka-assembly/assembly,
  streaming-flume-assembly/assembly,
+ streaming-mqtt-assembly/assembly,
+ streaming-mqtt/test:assembly,
  streaming-kinesis-asl-assembly/assembly]
 profiles_and_goals = build_profiles + sbt_goals
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8f4014fd/dev/sparktestsupport/modules.py
--
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index a9717ff..d82c0cc 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -181,6 +181,7 @@ streaming_mqtt = Module(
 dependencies=[streaming],
 source_file_regexes=[
 external/mqtt,
+external/mqtt-assembly,
 ],
 sbt_test_goals=[
 streaming-mqtt/test,
@@ -306,6 +307,7 @@ pyspark_streaming = Module(
 streaming,
 streaming_kafka,
 streaming_flume_assembly,
+streaming_mqtt,
 streaming_kinesis_asl
 

spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

2015-08-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 4b5bbc589 - 6dde38026


[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

Spark streaming deletes the temp file and backup files without checking if they 
exist or not

Author: Hao Zhu viadea...@gmail.com

Closes #8082 from viadea/master and squashes the following commits:

242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of 
those files
fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before 
deleting backupFile files.
087daf0 [Hao Zhu] SPARK-9801

(cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dde3802
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dde3802
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dde3802

Branch: refs/heads/branch-1.4
Commit: 6dde38026113d8f83190e801a0f889c53bbc316d
Parents: 4b5bbc5
Author: Hao Zhu viadea...@gmail.com
Authored: Mon Aug 10 17:17:22 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 10 17:17:47 2015 -0700

--
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala   | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6dde3802/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5279331..bd117ed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -189,7 +189,9 @@ class CheckpointWriter(
 + ')
 
   // Write checkpoint to temp file
-  fs.delete(tempFile, true)   // just in case it exists
+  if (fs.exists(tempFile)) {
+fs.delete(tempFile, true)   // just in case it exists
+  }
   val fos = fs.create(tempFile)
   Utils.tryWithSafeFinally {
 fos.write(bytes)
@@ -200,7 +202,9 @@ class CheckpointWriter(
   // If the checkpoint file exists, back it up
   // If the backup exists as well, just delete it, otherwise rename 
will fail
   if (fs.exists(checkpointFile)) {
-fs.delete(backupFile, true) // just in case it exists
+if (fs.exists(backupFile)){
+  fs.delete(backupFile, true) // just in case it exists
+}
 if (!fs.rename(checkpointFile, backupFile)) {
   logWarning(Could not rename  + checkpointFile +  to  + 
backupFile)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

2015-08-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 8f4014fda - 94692bb14


[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

Spark streaming deletes the temp file and backup files without checking if they 
exist or not

Author: Hao Zhu viadea...@gmail.com

Closes #8082 from viadea/master and squashes the following commits:

242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of 
those files
fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before 
deleting backupFile files.
087daf0 [Hao Zhu] SPARK-9801

(cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94692bb1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94692bb1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94692bb1

Branch: refs/heads/branch-1.5
Commit: 94692bb14f75b814aab00bc43f15550e26ada6f1
Parents: 8f4014f
Author: Hao Zhu viadea...@gmail.com
Authored: Mon Aug 10 17:17:22 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 10 17:17:33 2015 -0700

--
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala   | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/94692bb1/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 2780d5b..6f6b449 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -192,7 +192,9 @@ class CheckpointWriter(
 + ')
 
   // Write checkpoint to temp file
-  fs.delete(tempFile, true)   // just in case it exists
+  if (fs.exists(tempFile)) {
+fs.delete(tempFile, true)   // just in case it exists
+  }
   val fos = fs.create(tempFile)
   Utils.tryWithSafeFinally {
 fos.write(bytes)
@@ -203,7 +205,9 @@ class CheckpointWriter(
   // If the checkpoint file exists, back it up
   // If the backup exists as well, just delete it, otherwise rename 
will fail
   if (fs.exists(checkpointFile)) {
-fs.delete(backupFile, true) // just in case it exists
+if (fs.exists(backupFile)){
+  fs.delete(backupFile, true) // just in case it exists
+}
 if (!fs.rename(checkpointFile, backupFile)) {
   logWarning(Could not rename  + checkpointFile +  to  + 
backupFile)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

2015-08-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 853809e94 - 3c9802d94


[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

Spark streaming deletes the temp file and backup files without checking if they 
exist or not

Author: Hao Zhu viadea...@gmail.com

Closes #8082 from viadea/master and squashes the following commits:

242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of 
those files
fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before 
deleting backupFile files.
087daf0 [Hao Zhu] SPARK-9801


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c9802d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c9802d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c9802d9

Branch: refs/heads/master
Commit: 3c9802d9400bea802984456683b2736a450ee17e
Parents: 853809e
Author: Hao Zhu viadea...@gmail.com
Authored: Mon Aug 10 17:17:22 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 10 17:17:22 2015 -0700

--
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala   | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c9802d9/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 2780d5b..6f6b449 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -192,7 +192,9 @@ class CheckpointWriter(
 + ')
 
   // Write checkpoint to temp file
-  fs.delete(tempFile, true)   // just in case it exists
+  if (fs.exists(tempFile)) {
+fs.delete(tempFile, true)   // just in case it exists
+  }
   val fos = fs.create(tempFile)
   Utils.tryWithSafeFinally {
 fos.write(bytes)
@@ -203,7 +205,9 @@ class CheckpointWriter(
   // If the checkpoint file exists, back it up
   // If the backup exists as well, just delete it, otherwise rename 
will fail
   if (fs.exists(checkpointFile)) {
-fs.delete(backupFile, true) // just in case it exists
+if (fs.exists(backupFile)){
+  fs.delete(backupFile, true) // just in case it exists
+}
 if (!fs.rename(checkpointFile, backupFile)) {
   logWarning(Could not rename  + checkpointFile +  to  + 
backupFile)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

2015-08-10 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 b104501d3 - a98603f8c


[SPARK-9801] [STREAMING] Check if file exists before deleting temporary files.

Spark streaming deletes the temp file and backup files without checking if they 
exist or not

Author: Hao Zhu viadea...@gmail.com

Closes #8082 from viadea/master and squashes the following commits:

242d05f [Hao Zhu] [SPARK-9801][Streaming]No need to check the existence of 
those files
fd143f2 [Hao Zhu] [SPARK-9801][Streaming]Check if backupFile exists before 
deleting backupFile files.
087daf0 [Hao Zhu] SPARK-9801

(cherry picked from commit 3c9802d9400bea802984456683b2736a450ee17e)
Signed-off-by: Tathagata Das tathagata.das1...@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a98603f8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a98603f8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a98603f8

Branch: refs/heads/branch-1.3
Commit: a98603f8c118fcd23efe80ebaa120e47e9785d46
Parents: b104501
Author: Hao Zhu viadea...@gmail.com
Authored: Mon Aug 10 17:17:22 2015 -0700
Committer: Tathagata Das tathagata.das1...@gmail.com
Committed: Mon Aug 10 17:18:03 2015 -0700

--
 .../main/scala/org/apache/spark/streaming/Checkpoint.scala   | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a98603f8/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 832ce78..c1d0fe4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -137,7 +137,9 @@ class CheckpointWriter(
 + ')
 
   // Write checkpoint to temp file
-  fs.delete(tempFile, true)   // just in case it exists
+  if (fs.exists(tempFile)) {
+fs.delete(tempFile, true)   // just in case it exists
+  }
   val fos = fs.create(tempFile)
   fos.write(bytes)
   fos.close()
@@ -145,7 +147,9 @@ class CheckpointWriter(
   // If the checkpoint file exists, back it up
   // If the backup exists as well, just delete it, otherwise rename 
will fail
   if (fs.exists(checkpointFile)) {
-fs.delete(backupFile, true) // just in case it exists
+if (fs.exists(backupFile)){
+  fs.delete(backupFile, true) // just in case it exists
+}
 if (!fs.rename(checkpointFile, backupFile)) {
   logWarning(Could not rename  + checkpointFile +  to  + 
backupFile)
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[04/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
new file mode 100644
index 000..8f06de7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -0,0 +1,916 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.parquet.schema.MessageTypeParser
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.types._
+
+abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest {
+  val sqlContext = TestSQLContext
+
+  /**
+   * Checks whether the reflected Parquet message type for product type `T` 
conforms `messageType`.
+   */
+  protected def testSchemaInference[T : Product: ClassTag: TypeTag](
+  testName: String,
+  messageType: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+testSchema(
+  testName,
+  StructType.fromAttributes(ScalaReflection.attributesFor[T]),
+  messageType,
+  binaryAsString,
+  int96AsTimestamp,
+  followParquetFormatSpec,
+  isThriftDerived)
+  }
+
+  protected def testParquetToCatalyst(
+  testName: String,
+  sqlSchema: StructType,
+  parquetSchema: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+val converter = new CatalystSchemaConverter(
+  assumeBinaryIsString = binaryAsString,
+  assumeInt96IsTimestamp = int96AsTimestamp,
+  followParquetFormatSpec = followParquetFormatSpec)
+
+test(ssql = parquet: $testName) {
+  val actual = 
converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
+  val expected = sqlSchema
+  assert(
+actual === expected,
+sSchema mismatch.
+   |Expected schema: ${expected.json}
+   |Actual schema:   ${actual.json}
+ .stripMargin)
+}
+  }
+
+  protected def testCatalystToParquet(
+  testName: String,
+  sqlSchema: StructType,
+  parquetSchema: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+val converter = new CatalystSchemaConverter(
+  assumeBinaryIsString = binaryAsString,
+  assumeInt96IsTimestamp = int96AsTimestamp,
+  followParquetFormatSpec = followParquetFormatSpec)
+
+test(ssql = parquet: $testName) {
+  val actual = converter.convert(sqlSchema)
+  val expected = MessageTypeParser.parseMessageType(parquetSchema)
+  actual.checkContains(expected)
+  expected.checkContains(actual)
+}
+  }
+
+  protected def testSchema(
+  testName: String,
+  sqlSchema: StructType,
+  parquetSchema: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+
+testCatalystToParquet(
+  testName,
+  sqlSchema,
+  parquetSchema,
+  binaryAsString,
+  int96AsTimestamp,
+  followParquetFormatSpec,
+  isThriftDerived)
+
+testParquetToCatalyst(
+  testName,
+  sqlSchema,
+  parquetSchema,
+  binaryAsString,
+  int96AsTimestamp,
+  followParquetFormatSpec,
+  isThriftDerived)
+  }
+}
+
+class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+  

[04/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
new file mode 100644
index 000..8f06de7
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -0,0 +1,916 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
+
+import org.apache.parquet.schema.MessageTypeParser
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.types._
+
+abstract class ParquetSchemaTest extends SparkFunSuite with ParquetTest {
+  val sqlContext = TestSQLContext
+
+  /**
+   * Checks whether the reflected Parquet message type for product type `T` 
conforms `messageType`.
+   */
+  protected def testSchemaInference[T : Product: ClassTag: TypeTag](
+  testName: String,
+  messageType: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+testSchema(
+  testName,
+  StructType.fromAttributes(ScalaReflection.attributesFor[T]),
+  messageType,
+  binaryAsString,
+  int96AsTimestamp,
+  followParquetFormatSpec,
+  isThriftDerived)
+  }
+
+  protected def testParquetToCatalyst(
+  testName: String,
+  sqlSchema: StructType,
+  parquetSchema: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+val converter = new CatalystSchemaConverter(
+  assumeBinaryIsString = binaryAsString,
+  assumeInt96IsTimestamp = int96AsTimestamp,
+  followParquetFormatSpec = followParquetFormatSpec)
+
+test(ssql = parquet: $testName) {
+  val actual = 
converter.convert(MessageTypeParser.parseMessageType(parquetSchema))
+  val expected = sqlSchema
+  assert(
+actual === expected,
+sSchema mismatch.
+   |Expected schema: ${expected.json}
+   |Actual schema:   ${actual.json}
+ .stripMargin)
+}
+  }
+
+  protected def testCatalystToParquet(
+  testName: String,
+  sqlSchema: StructType,
+  parquetSchema: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+val converter = new CatalystSchemaConverter(
+  assumeBinaryIsString = binaryAsString,
+  assumeInt96IsTimestamp = int96AsTimestamp,
+  followParquetFormatSpec = followParquetFormatSpec)
+
+test(ssql = parquet: $testName) {
+  val actual = converter.convert(sqlSchema)
+  val expected = MessageTypeParser.parseMessageType(parquetSchema)
+  actual.checkContains(expected)
+  expected.checkContains(actual)
+}
+  }
+
+  protected def testSchema(
+  testName: String,
+  sqlSchema: StructType,
+  parquetSchema: String,
+  binaryAsString: Boolean = true,
+  int96AsTimestamp: Boolean = true,
+  followParquetFormatSpec: Boolean = false,
+  isThriftDerived: Boolean = false): Unit = {
+
+testCatalystToParquet(
+  testName,
+  sqlSchema,
+  parquetSchema,
+  binaryAsString,
+  int96AsTimestamp,
+  followParquetFormatSpec,
+  isThriftDerived)
+
+testParquetToCatalyst(
+  testName,
+  sqlSchema,
+  parquetSchema,
+  binaryAsString,
+  int96AsTimestamp,
+  followParquetFormatSpec,
+  isThriftDerived)
+  }
+}
+
+class ParquetSchemaInferenceSuite extends ParquetSchemaTest {
+  

[10/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
new file mode 100644
index 000..0b0867f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
+  extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
+
+  val parent = sparkUI
+  val listener = sqlContext.listener
+
+  attachPage(new AllExecutionsPage(this))
+  attachPage(new ExecutionPage(this))
+  parent.attachTab(this)
+
+  parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, /static/sql)
+}
+
+private[sql] object SQLTab {
+
+  private val STATIC_RESOURCE_DIR = org/apache/spark/sql/execution/ui/static
+
+  private val nextTabId = new AtomicInteger(0)
+
+  private def nextTabName: String = {
+val nextId = nextTabId.getAndIncrement()
+if (nextId == 0) SQL else sSQL$nextId
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
new file mode 100644
index 000..ae3d752
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+
+/**
+ * A graph used for storing information of an executionPlan of DataFrame.
+ *
+ * Each graph is defined with a set of nodes and a set of edges. Each node 
represents a node in the
+ * SparkPlan tree, and each edge represents a parent-child relationship 
between two nodes.
+ */
+private[ui] case class SparkPlanGraph(
+nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) {
+
+  def makeDotFile(metrics: Map[Long, Any]): String = {
+val dotFile = new StringBuilder
+dotFile.append(digraph G {\n)
+nodes.foreach(node = dotFile.append(node.makeDotNode(metrics) + \n))
+edges.foreach(edge = dotFile.append(edge.makeDotEdge + \n))
+dotFile.append(})
+dotFile.toString()
+  }
+}
+
+private[sql] object SparkPlanGraph {
+
+  /**
+   * Build a SparkPlanGraph from the root of a SparkPlan tree.
+   */
+  def apply(plan: SparkPlan): SparkPlanGraph = {
+val nodeIdGenerator = new AtomicLong(0)
+val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
+val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
+buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
+new 

[08/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
deleted file mode 100644
index b6db71b..000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.net.URI
-import java.util.logging.{Level, Logger = JLogger}
-import java.util.{List = JList}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Try}
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, 
_}
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.{Log = ParquetLog}
-
-import org.apache.spark.{Logging, Partition = SparkPartition, SparkException}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionSpec
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-
-private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
-
-  def format(): String = parquet
-
-  override def createRelation(
-  sqlContext: SQLContext,
-  paths: Array[String],
-  schema: Option[StructType],
-  partitionColumns: Option[StructType],
-  parameters: Map[String, String]): HadoopFsRelation = {
-new ParquetRelation(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
-  }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
-private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
-  extends OutputWriter {
-
-  private val recordWriter: RecordWriter[Void, InternalRow] = {
-val outputFormat = {
-  new ParquetOutputFormat[InternalRow]() {
-// Here we override `getDefaultWorkFile` for two reasons:
-//
-//  1. To allow appending.  We need to generate unique output file 
names to avoid
-// overwriting existing files (either exist before the write job, 
or are just written
-// by other tasks within the same write job).
-//
-//  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
-// `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
-// partitions in the case of dynamic partitioning.
-override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-  val uniqueWriteJobId = 
context.getConfiguration.get(spark.sql.sources.writeJobUUID)
-  val split = context.getTaskAttemptID.getTaskID.getId
-  new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension)
-}
-  }
-}
-
-outputFormat.getRecordWriter(context)
-  }
-
-  override def write(row: Row): Unit = throw new 
UnsupportedOperationException(call writeInternal)
-
-  override protected[sql] def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(context)
-}
-
-private[sql] class ParquetRelation(
-override val paths: Array[String],
-private val maybeDataSchema: Option[StructType],
-// This is for metastore conversion.
-private 

[03/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
deleted file mode 100644
index 92022ff..000
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ /dev/null
@@ -1,1172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.json
-
-import java.io.{File, StringWriter}
-import java.sql.{Date, Timestamp}
-
-import com.fasterxml.jackson.core.JsonFactory
-import org.apache.spark.rdd.RDD
-import org.scalactic.Tolerance._
-
-import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf}
-import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, 
LogicalRelation}
-import org.apache.spark.sql.json.InferSchema.compatibleType
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.Utils
-
-class JsonSuite extends QueryTest with SQLTestUtils with TestJsonData {
-
-  protected lazy val ctx = org.apache.spark.sql.test.TestSQLContext
-  override def sqlContext: SQLContext = ctx // used by SQLTestUtils
-
-  import ctx.sql
-  import ctx.implicits._
-
-  test(Type promotion) {
-def checkTypePromotion(expected: Any, actual: Any) {
-  assert(expected.getClass == actual.getClass,
-sFailed to promote ${actual.getClass} to ${expected.getClass}.)
-  assert(expected == actual,
-sPromoted value ${actual}(${actual.getClass}) does not equal the 
expected value  +
-  s${expected}(${expected.getClass}).)
-}
-
-val factory = new JsonFactory()
-def enforceCorrectType(value: Any, dataType: DataType): Any = {
-  val writer = new StringWriter()
-  val generator = factory.createGenerator(writer)
-  generator.writeObject(value)
-  generator.flush()
-
-  val parser = factory.createParser(writer.toString)
-  parser.nextToken()
-  JacksonParser.convertField(factory, parser, dataType)
-}
-
-val intNumber: Int = 2147483647
-checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType))
-checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, 
LongType))
-checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, 
DoubleType))
-checkTypePromotion(
-  Decimal(intNumber), enforceCorrectType(intNumber, 
DecimalType.SYSTEM_DEFAULT))
-
-val longNumber: Long = 9223372036854775807L
-checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType))
-checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, 
DoubleType))
-checkTypePromotion(
-  Decimal(longNumber), enforceCorrectType(longNumber, 
DecimalType.SYSTEM_DEFAULT))
-
-val doubleNumber: Double = 1.7976931348623157E308d
-checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, 
DoubleType))
-
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(intNumber)),
-enforceCorrectType(intNumber, TimestampType))
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(intNumber.toLong)),
-enforceCorrectType(intNumber.toLong, TimestampType))
-val strTime = 2014-09-30 12:34:56
-
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
-enforceCorrectType(strTime, TimestampType))
-
-val strDate = 2014-10-15
-checkTypePromotion(
-  DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), 
enforceCorrectType(strDate, DateType))
-
-val ISO8601Time1 = 1970-01-01T01:00:01.0Z
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
-enforceCorrectType(ISO8601Time1, TimestampType))
-checkTypePromotion(DateTimeUtils.millisToDays(3601000),
-  enforceCorrectType(ISO8601Time1, DateType))
-val ISO8601Time2 = 1970-01-01T02:00:01-01:00
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(10801000)),
-  

[02/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
deleted file mode 100644
index b415da5..000
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-import scala.reflect.runtime.universe.TypeTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.parquet.example.data.simple.SimpleGroup
-import org.apache.parquet.example.data.{Group, GroupWriter}
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, 
ParquetMetadata}
-import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, 
ParquetOutputCommitter, ParquetWriter}
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.{MessageType, MessageTypeParser}
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-
-// Write support class for nested groups: ParquetWriter initializes 
GroupWriteSupport
-// with an empty configuration (it is after all not intended to be used in 
this way?)
-// and members are private so we need to make our own in order to pass the 
schema
-// to the writer.
-private[parquet] class TestGroupWriteSupport(schema: MessageType) extends 
WriteSupport[Group] {
-  var groupWriter: GroupWriter = null
-
-  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
-groupWriter = new GroupWriter(recordConsumer, schema)
-  }
-
-  override def init(configuration: Configuration): WriteContext = {
-new WriteContext(schema, new java.util.HashMap[String, String]())
-  }
-
-  override def write(record: Group) {
-groupWriter.write(record)
-  }
-}
-
-/**
- * A test suite that tests basic Parquet I/O.
- */
-class ParquetIOSuite extends QueryTest with ParquetTest {
-  lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
-  import sqlContext.implicits._
-
-  /**
-   * Writes `data` to a Parquet file, reads it back and check file contents.
-   */
-  protected def checkParquetFile[T : Product : ClassTag: TypeTag](data: 
Seq[T]): Unit = {
-withParquetDataFrame(data)(r = checkAnswer(r, data.map(Row.fromTuple)))
-  }
-
-  test(basic data types (without binary)) {
-val data = (1 to 4).map { i =
-  (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
-}
-checkParquetFile(data)
-  }
-
-  test(raw binary) {
-val data = (1 to 4).map(i = Tuple1(Array.fill(3)(i.toByte)))
-withParquetDataFrame(data) { df =
-  assertResult(data.map(_._1.mkString(,)).sorted) {
-df.collect().map(_.getAs[Array[Byte]](0).mkString(,)).sorted
-  }
-}
-  }
-
-  test(string) {
-val data = (1 to 4).map(i = Tuple1(i.toString))
-// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet 
files written by Spark SQL
-// as we store Spark SQL schema in the extra metadata.
-withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - 
false)(checkParquetFile(data))
-withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - 
true)(checkParquetFile(data))
-  }
-
-  test(fixed-length decimals) {
-def makeDecimalRDD(decimal: DecimalType): DataFrame =
-  sqlContext.sparkContext
-.parallelize(0 to 1000)
-.map(i = Tuple1(i / 100.0))
-.toDF()
-// Parquet doesn't allow column names with spaces, have to add an 
alias here
-.select($_1 cast decimal as dec)
-
-for ((precision, scale) - Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 

[11/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
new file mode 100644
index 000..4086a13
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.net.URI
+import java.util.logging.{Level, Logger = JLogger}
+import java.util.{List = JList}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.{Failure, Try}
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, 
_}
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.{Log = ParquetLog}
+
+import org.apache.spark.{Logging, Partition = SparkPartition, SparkException}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+
+private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
+
+  override def shortName(): String = parquet
+
+  override def createRelation(
+  sqlContext: SQLContext,
+  paths: Array[String],
+  schema: Option[StructType],
+  partitionColumns: Option[StructType],
+  parameters: Map[String, String]): HadoopFsRelation = {
+new ParquetRelation(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
+  }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
+private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
+  extends OutputWriter {
+
+  private val recordWriter: RecordWriter[Void, InternalRow] = {
+val outputFormat = {
+  new ParquetOutputFormat[InternalRow]() {
+// Here we override `getDefaultWorkFile` for two reasons:
+//
+//  1. To allow appending.  We need to generate unique output file 
names to avoid
+// overwriting existing files (either exist before the write job, 
or are just written
+// by other tasks within the same write job).
+//
+//  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
+// `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
+// partitions in the case of dynamic partitioning.
+override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
+  val uniqueWriteJobId = 
context.getConfiguration.get(spark.sql.sources.writeJobUUID)
+  val split = context.getTaskAttemptID.getTaskID.getId
+  new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension)
+}
+  }
+}
+
+outputFormat.getRecordWriter(context)
+  }
+
+  override def write(row: Row): Unit = throw new 
UnsupportedOperationException(call writeInternal)
+
+  override protected[sql] def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
+
+  override def close(): Unit = recordWriter.close(context)
+}
+
+private[sql] class ParquetRelation(
+override val paths: 

[11/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
new file mode 100644
index 000..4086a13
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -0,0 +1,796 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.net.URI
+import java.util.logging.{Level, Logger = JLogger}
+import java.util.{List = JList}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.util.{Failure, Try}
+
+import com.google.common.base.Objects
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, 
_}
+import org.apache.parquet.schema.MessageType
+import org.apache.parquet.{Log = ParquetLog}
+
+import org.apache.spark.{Logging, Partition = SparkPartition, SparkException}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
+import org.apache.spark.rdd.RDD._
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionSpec
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
+
+
+private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
+
+  override def shortName(): String = parquet
+
+  override def createRelation(
+  sqlContext: SQLContext,
+  paths: Array[String],
+  schema: Option[StructType],
+  partitionColumns: Option[StructType],
+  parameters: Map[String, String]): HadoopFsRelation = {
+new ParquetRelation(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
+  }
+}
+
+// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
+private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
+  extends OutputWriter {
+
+  private val recordWriter: RecordWriter[Void, InternalRow] = {
+val outputFormat = {
+  new ParquetOutputFormat[InternalRow]() {
+// Here we override `getDefaultWorkFile` for two reasons:
+//
+//  1. To allow appending.  We need to generate unique output file 
names to avoid
+// overwriting existing files (either exist before the write job, 
or are just written
+// by other tasks within the same write job).
+//
+//  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
+// `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
+// partitions in the case of dynamic partitioning.
+override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
+  val uniqueWriteJobId = 
context.getConfiguration.get(spark.sql.sources.writeJobUUID)
+  val split = context.getTaskAttemptID.getTaskID.getId
+  new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension)
+}
+  }
+}
+
+outputFormat.getRecordWriter(context)
+  }
+
+  override def write(row: Row): Unit = throw new 
UnsupportedOperationException(call writeInternal)
+
+  override protected[sql] def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
+
+  override def close(): Unit = recordWriter.close(context)
+}
+
+private[sql] class ParquetRelation(
+override val paths: 

[13/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
new file mode 100644
index 000..8eab6a0
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, 
SQLException}
+import java.util.Properties
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.jdbc.JdbcDialects
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+
+/**
+ * Data corresponding to one partition of a JDBCRDD.
+ */
+private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends 
Partition {
+  override def index: Int = idx
+}
+
+
+private[sql] object JDBCRDD extends Logging {
+
+  /**
+   * Maps a JDBC type to a Catalyst type.  This function is called only when
+   * the JdbcDialect class corresponding to your database driver returns null.
+   *
+   * @param sqlType - A field of java.sql.Types
+   * @return The Catalyst type corresponding to sqlType.
+   */
+  private def getCatalystType(
+  sqlType: Int,
+  precision: Int,
+  scale: Int,
+  signed: Boolean): DataType = {
+val answer = sqlType match {
+  // scalastyle:off
+  case java.sql.Types.ARRAY = null
+  case java.sql.Types.BIGINT= if (signed) { LongType } else { 
DecimalType(20,0) }
+  case java.sql.Types.BINARY= BinaryType
+  case java.sql.Types.BIT   = BooleanType // @see JdbcDialect for 
quirks
+  case java.sql.Types.BLOB  = BinaryType
+  case java.sql.Types.BOOLEAN   = BooleanType
+  case java.sql.Types.CHAR  = StringType
+  case java.sql.Types.CLOB  = StringType
+  case java.sql.Types.DATALINK  = null
+  case java.sql.Types.DATE  = DateType
+  case java.sql.Types.DECIMAL
+if precision != 0 || scale != 0 = DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.DECIMAL   = DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.DISTINCT  = null
+  case java.sql.Types.DOUBLE= DoubleType
+  case java.sql.Types.FLOAT = FloatType
+  case java.sql.Types.INTEGER   = if (signed) { IntegerType } else { 
LongType }
+  case java.sql.Types.JAVA_OBJECT   = null
+  case java.sql.Types.LONGNVARCHAR  = StringType
+  case java.sql.Types.LONGVARBINARY = BinaryType
+  case java.sql.Types.LONGVARCHAR   = StringType
+  case java.sql.Types.NCHAR = StringType
+  case java.sql.Types.NCLOB = StringType
+  case java.sql.Types.NULL  = null
+  case java.sql.Types.NUMERIC
+if precision != 0 || scale != 0 = DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.NUMERIC   = DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.NVARCHAR  = StringType
+  case java.sql.Types.OTHER = null
+  case java.sql.Types.REAL  = DoubleType
+  case java.sql.Types.REF   = StringType
+  case java.sql.Types.ROWID = LongType
+  case java.sql.Types.SMALLINT  = IntegerType
+  case java.sql.Types.SQLXML= StringType
+  case java.sql.Types.STRUCT= StringType
+  case java.sql.Types.TIME  = TimestampType
+  case java.sql.Types.TIMESTAMP = TimestampType
+  case java.sql.Types.TINYINT   = IntegerType
+  case 

[08/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
deleted file mode 100644
index b6db71b..000
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ /dev/null
@@ -1,796 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.net.URI
-import java.util.logging.{Level, Logger = JLogger}
-import java.util.{List = JList}
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable
-import scala.util.{Failure, Try}
-
-import com.google.common.base.Objects
-import org.apache.hadoop.fs.{FileStatus, Path}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.parquet.filter2.predicate.FilterApi
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
-import org.apache.parquet.hadoop.util.ContextUtil
-import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetRecordReader, 
_}
-import org.apache.parquet.schema.MessageType
-import org.apache.parquet.{Log = ParquetLog}
-
-import org.apache.spark.{Logging, Partition = SparkPartition, SparkException}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.{SqlNewHadoopPartition, SqlNewHadoopRDD, RDD}
-import org.apache.spark.rdd.RDD._
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionSpec
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-
-private[sql] class DefaultSource extends HadoopFsRelationProvider with 
DataSourceRegister {
-
-  def format(): String = parquet
-
-  override def createRelation(
-  sqlContext: SQLContext,
-  paths: Array[String],
-  schema: Option[StructType],
-  partitionColumns: Option[StructType],
-  parameters: Map[String, String]): HadoopFsRelation = {
-new ParquetRelation(paths, schema, None, partitionColumns, 
parameters)(sqlContext)
-  }
-}
-
-// NOTE: This class is instantiated and used on executor side only, no need to 
be serializable.
-private[sql] class ParquetOutputWriter(path: String, context: 
TaskAttemptContext)
-  extends OutputWriter {
-
-  private val recordWriter: RecordWriter[Void, InternalRow] = {
-val outputFormat = {
-  new ParquetOutputFormat[InternalRow]() {
-// Here we override `getDefaultWorkFile` for two reasons:
-//
-//  1. To allow appending.  We need to generate unique output file 
names to avoid
-// overwriting existing files (either exist before the write job, 
or are just written
-// by other tasks within the same write job).
-//
-//  2. To allow dynamic partitioning.  Default `getDefaultWorkFile` 
uses
-// `FileOutputCommitter.getWorkPath()`, which points to the base 
directory of all
-// partitions in the case of dynamic partitioning.
-override def getDefaultWorkFile(context: TaskAttemptContext, 
extension: String): Path = {
-  val uniqueWriteJobId = 
context.getConfiguration.get(spark.sql.sources.writeJobUUID)
-  val split = context.getTaskAttemptID.getTaskID.getId
-  new Path(path, fpart-r-$split%05d-$uniqueWriteJobId$extension)
-}
-  }
-}
-
-outputFormat.getRecordWriter(context)
-  }
-
-  override def write(row: Row): Unit = throw new 
UnsupportedOperationException(call writeInternal)
-
-  override protected[sql] def writeInternal(row: InternalRow): Unit = 
recordWriter.write(null, row)
-
-  override def close(): Unit = recordWriter.close(context)
-}
-
-private[sql] class ParquetRelation(
-override val paths: Array[String],
-private val maybeDataSchema: Option[StructType],
-// This is for metastore conversion.
-private 

[10/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
new file mode 100644
index 000..0b0867f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
+  extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
+
+  val parent = sparkUI
+  val listener = sqlContext.listener
+
+  attachPage(new AllExecutionsPage(this))
+  attachPage(new ExecutionPage(this))
+  parent.attachTab(this)
+
+  parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, /static/sql)
+}
+
+private[sql] object SQLTab {
+
+  private val STATIC_RESOURCE_DIR = org/apache/spark/sql/execution/ui/static
+
+  private val nextTabId = new AtomicInteger(0)
+
+  private def nextTabName: String = {
+val nextId = nextTabId.getAndIncrement()
+if (nextId == 0) SQL else sSQL$nextId
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
new file mode 100644
index 000..ae3d752
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.ui
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+
+/**
+ * A graph used for storing information of an executionPlan of DataFrame.
+ *
+ * Each graph is defined with a set of nodes and a set of edges. Each node 
represents a node in the
+ * SparkPlan tree, and each edge represents a parent-child relationship 
between two nodes.
+ */
+private[ui] case class SparkPlanGraph(
+nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) {
+
+  def makeDotFile(metrics: Map[Long, Any]): String = {
+val dotFile = new StringBuilder
+dotFile.append(digraph G {\n)
+nodes.foreach(node = dotFile.append(node.makeDotNode(metrics) + \n))
+edges.foreach(edge = dotFile.append(edge.makeDotEdge + \n))
+dotFile.append(})
+dotFile.toString()
+  }
+}
+
+private[sql] object SparkPlanGraph {
+
+  /**
+   * Build a SparkPlanGraph from the root of a SparkPlan tree.
+   */
+  def apply(plan: SparkPlan): SparkPlanGraph = {
+val nodeIdGenerator = new AtomicLong(0)
+val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
+val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
+buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
+new 

[14/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
[SPARK-9763][SQL] Minimize exposure of internal SQL classes.

There are a few changes in this pull request:

1. Moved all data sources to execution.datasources, except the public JDBC APIs.
2. In order to maintain backward compatibility from 1, added a backward 
compatibility translation map in data source resolution.
3. Moved ui and metric package into execution.
4. Added more documentation on some internal classes.
5. Renamed DataSourceRegister.format - shortName.
6. Added override modifier on shortName.
7. Removed IntSQLMetric.

Author: Reynold Xin r...@databricks.com

Closes #8056 from rxin/SPARK-9763 and squashes the following commits:

9df4801 [Reynold Xin] Removed hardcoded name in test cases.
d9babc6 [Reynold Xin] Shorten.
e484419 [Reynold Xin] Removed VisibleForTesting.
171b812 [Reynold Xin] MimaExcludes.
2041389 [Reynold Xin] Compile ...
79dda42 [Reynold Xin] Compile.
0818ba3 [Reynold Xin] Removed IntSQLMetric.
c46884f [Reynold Xin] Two more fixes.
f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL 
classes.

(cherry picked from commit 40ed2af587cedadc6e5249031857a922b3b234ca)
Signed-off-by: Reynold Xin r...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1838e43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1838e43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1838e43

Branch: refs/heads/branch-1.5
Commit: c1838e4309a2286cebc8bc73907b1c403260f22c
Parents: d251d9f
Author: Reynold Xin r...@databricks.com
Authored: Mon Aug 10 13:49:23 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 13:49:34 2015 -0700

--
 project/MimaExcludes.scala  |   24 +-
 apache.spark.sql.sources.DataSourceRegister |6 +-
 .../sql/execution/ui/static/spark-sql-viz.css   |   37 +
 .../sql/execution/ui/static/spark-sql-viz.js|  160 +++
 .../spark/sql/ui/static/spark-sql-viz.css   |   37 -
 .../apache/spark/sql/ui/static/spark-sql-viz.js |  160 ---
 .../scala/org/apache/spark/sql/DataFrame.scala  |2 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |6 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |6 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |2 +-
 .../spark/sql/execution/SQLExecution.scala  |2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |8 +-
 .../spark/sql/execution/basicOperators.scala|2 +-
 .../sql/execution/datasources/DDLParser.scala   |  185 +++
 .../execution/datasources/DefaultSource.scala   |   64 +
 .../datasources/InsertIntoDataSource.scala  |   23 +-
 .../datasources/ResolvedDataSource.scala|  204 +++
 .../spark/sql/execution/datasources/ddl.scala   |  352 +-
 .../datasources/jdbc/DefaultSource.scala|   62 +
 .../datasources/jdbc/DriverRegistry.scala   |   60 +
 .../datasources/jdbc/DriverWrapper.scala|   48 +
 .../execution/datasources/jdbc/JDBCRDD.scala|  489 
 .../datasources/jdbc/JDBCRelation.scala |  113 ++
 .../execution/datasources/jdbc/JdbcUtils.scala  |  219 
 .../datasources/json/InferSchema.scala  |  207 
 .../datasources/json/JSONRelation.scala |  204 +++
 .../datasources/json/JacksonGenerator.scala |  135 ++
 .../datasources/json/JacksonParser.scala|  228 
 .../datasources/json/JacksonUtils.scala |   32 +
 .../parquet/CatalystReadSupport.scala   |  153 +++
 .../parquet/CatalystRecordMaterializer.scala|   41 +
 .../parquet/CatalystRowConverter.scala  |  449 +++
 .../parquet/CatalystSchemaConverter.scala   |  592 +
 .../parquet/DirectParquetOutputCommitter.scala  |   87 ++
 .../datasources/parquet/ParquetConverter.scala  |   39 +
 .../datasources/parquet/ParquetFilters.scala|  360 ++
 .../datasources/parquet/ParquetRelation.scala   |  796 
 .../parquet/ParquetTableSupport.scala   |  322 +
 .../parquet/ParquetTypesConverter.scala |  159 +++
 .../spark/sql/execution/metric/SQLMetrics.scala |  115 ++
 .../apache/spark/sql/execution/package.scala|8 +-
 .../sql/execution/ui/AllExecutionsPage.scala|  238 
 .../spark/sql/execution/ui/ExecutionPage.scala  |  127 ++
 .../spark/sql/execution/ui/SQLListener.scala|  352 ++
 .../apache/spark/sql/execution/ui/SQLTab.scala  |   49 +
 .../spark/sql/execution/ui/SparkPlanGraph.scala |  118 ++
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala |  490 
 .../apache/spark/sql/jdbc/JDBCRelation.scala|  152 ---
 .../org/apache/spark/sql/jdbc/JdbcUtils.scala   |   52 -
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  |  250 
 .../org/apache/spark/sql/json/InferSchema.scala |  207 
 .../apache/spark/sql/json/JSONRelation.scala|  203 ---
 .../spark/sql/json/JacksonGenerator.scala   |  135 --
 

[02/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
deleted file mode 100644
index b415da5..000
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ /dev/null
@@ -1,436 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import scala.collection.JavaConversions._
-import scala.reflect.ClassTag
-import scala.reflect.runtime.universe.TypeTag
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
-import org.apache.parquet.example.data.simple.SimpleGroup
-import org.apache.parquet.example.data.{Group, GroupWriter}
-import org.apache.parquet.hadoop.api.WriteSupport
-import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, 
ParquetMetadata}
-import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, 
ParquetOutputCommitter, ParquetWriter}
-import org.apache.parquet.io.api.RecordConsumer
-import org.apache.parquet.schema.{MessageType, MessageTypeParser}
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.ScalaReflection
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-
-// Write support class for nested groups: ParquetWriter initializes 
GroupWriteSupport
-// with an empty configuration (it is after all not intended to be used in 
this way?)
-// and members are private so we need to make our own in order to pass the 
schema
-// to the writer.
-private[parquet] class TestGroupWriteSupport(schema: MessageType) extends 
WriteSupport[Group] {
-  var groupWriter: GroupWriter = null
-
-  override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
-groupWriter = new GroupWriter(recordConsumer, schema)
-  }
-
-  override def init(configuration: Configuration): WriteContext = {
-new WriteContext(schema, new java.util.HashMap[String, String]())
-  }
-
-  override def write(record: Group) {
-groupWriter.write(record)
-  }
-}
-
-/**
- * A test suite that tests basic Parquet I/O.
- */
-class ParquetIOSuite extends QueryTest with ParquetTest {
-  lazy val sqlContext = org.apache.spark.sql.test.TestSQLContext
-  import sqlContext.implicits._
-
-  /**
-   * Writes `data` to a Parquet file, reads it back and check file contents.
-   */
-  protected def checkParquetFile[T : Product : ClassTag: TypeTag](data: 
Seq[T]): Unit = {
-withParquetDataFrame(data)(r = checkAnswer(r, data.map(Row.fromTuple)))
-  }
-
-  test(basic data types (without binary)) {
-val data = (1 to 4).map { i =
-  (i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
-}
-checkParquetFile(data)
-  }
-
-  test(raw binary) {
-val data = (1 to 4).map(i = Tuple1(Array.fill(3)(i.toByte)))
-withParquetDataFrame(data) { df =
-  assertResult(data.map(_._1.mkString(,)).sorted) {
-df.collect().map(_.getAs[Array[Byte]](0).mkString(,)).sorted
-  }
-}
-  }
-
-  test(string) {
-val data = (1 to 4).map(i = Tuple1(i.toString))
-// Property spark.sql.parquet.binaryAsString shouldn't affect Parquet 
files written by Spark SQL
-// as we store Spark SQL schema in the extra metadata.
-withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - 
false)(checkParquetFile(data))
-withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key - 
true)(checkParquetFile(data))
-  }
-
-  test(fixed-length decimals) {
-def makeDecimalRDD(decimal: DecimalType): DataFrame =
-  sqlContext.sparkContext
-.parallelize(0 to 1000)
-.map(i = Tuple1(i / 100.0))
-.toDF()
-// Parquet doesn't allow column names with spaces, have to add an 
alias here
-.select($_1 cast decimal as dec)
-
-for ((precision, scale) - Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 

spark git commit: [SPARK-9633] [BUILD] SBT download locations outdated; need an update

2015-08-10 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 326988e00 - 119d6cf98


[SPARK-9633] [BUILD] SBT download locations outdated; need an update

Remove 2 defunct SBT download URLs and replace with the 1 known download URL. 
Also, use https.
Follow up on https://github.com/apache/spark/pull/7792

Author: Sean Owen so...@cloudera.com

Closes #7956 from srowen/SPARK-9633 and squashes the following commits:

caa40bd [Sean Owen] Remove 2 defunct SBT download URLs and replace with the 1 
known download URL. Also, use https.

Conflicts:
sbt/sbt-launch-lib.bash


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/119d6cf9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/119d6cf9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/119d6cf9

Branch: refs/heads/branch-1.1
Commit: 119d6cf987e59b92cca2753e921bc4e35dc37514
Parents: 326988e
Author: Sean Owen so...@cloudera.com
Authored: Thu Aug 6 23:43:52 2015 +0100
Committer: Davies Liu davies@gmail.com
Committed: Mon Aug 10 13:14:46 2015 -0700

--
 sbt/sbt-launch-lib.bash | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/119d6cf9/sbt/sbt-launch-lib.bash
--
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index c91fecf..da22ea0 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -38,8 +38,7 @@ dlog () {
 
 acquire_sbt_jar () {
   SBT_VERSION=`awk -F = '/sbt\\.version/ {print $2}' 
./project/build.properties`
-  
URL1=http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
-  
URL2=http://repo.typesafe.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
+  
URL1=https://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/${SBT_VERSION}/sbt-launch.jar
   JAR=sbt/sbt-launch-${SBT_VERSION}.jar
 
   sbt_jar=$JAR
@@ -51,9 +50,9 @@ acquire_sbt_jar () {
 printf Attempting to fetch sbt\n
 JAR_DL=${JAR}.part
 if hash curl 2/dev/null; then
-  (curl --progress-bar ${URL1}  ${JAR_DL} || curl --progress-bar ${URL2} 
 ${JAR_DL})  mv ${JAR_DL} ${JAR}
+  curl --progress-bar --fail --location ${URL1}  ${JAR_DL}  mv 
${JAR_DL} ${JAR}
 elif hash wget 2/dev/null; then
-  (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} 
-O ${JAR_DL})  mv ${JAR_DL} ${JAR}
+  wget --progress=bar ${URL1} -O ${JAR_DL}  mv ${JAR_DL} ${JAR}
 else
   printf You do not have curl or wget installed, please install sbt 
manually from http://www.scala-sbt.org/\n;
   exit -1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[14/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
[SPARK-9763][SQL] Minimize exposure of internal SQL classes.

There are a few changes in this pull request:

1. Moved all data sources to execution.datasources, except the public JDBC APIs.
2. In order to maintain backward compatibility from 1, added a backward 
compatibility translation map in data source resolution.
3. Moved ui and metric package into execution.
4. Added more documentation on some internal classes.
5. Renamed DataSourceRegister.format - shortName.
6. Added override modifier on shortName.
7. Removed IntSQLMetric.

Author: Reynold Xin r...@databricks.com

Closes #8056 from rxin/SPARK-9763 and squashes the following commits:

9df4801 [Reynold Xin] Removed hardcoded name in test cases.
d9babc6 [Reynold Xin] Shorten.
e484419 [Reynold Xin] Removed VisibleForTesting.
171b812 [Reynold Xin] MimaExcludes.
2041389 [Reynold Xin] Compile ...
79dda42 [Reynold Xin] Compile.
0818ba3 [Reynold Xin] Removed IntSQLMetric.
c46884f [Reynold Xin] Two more fixes.
f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL 
classes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40ed2af5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40ed2af5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40ed2af5

Branch: refs/heads/master
Commit: 40ed2af587cedadc6e5249031857a922b3b234ca
Parents: 0fe6674
Author: Reynold Xin r...@databricks.com
Authored: Mon Aug 10 13:49:23 2015 -0700
Committer: Reynold Xin r...@databricks.com
Committed: Mon Aug 10 13:49:23 2015 -0700

--
 project/MimaExcludes.scala  |   24 +-
 apache.spark.sql.sources.DataSourceRegister |6 +-
 .../sql/execution/ui/static/spark-sql-viz.css   |   37 +
 .../sql/execution/ui/static/spark-sql-viz.js|  160 +++
 .../spark/sql/ui/static/spark-sql-viz.css   |   37 -
 .../apache/spark/sql/ui/static/spark-sql-viz.js |  160 ---
 .../scala/org/apache/spark/sql/DataFrame.scala  |2 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |6 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |6 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |2 +-
 .../spark/sql/execution/SQLExecution.scala  |2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |8 +-
 .../spark/sql/execution/basicOperators.scala|2 +-
 .../sql/execution/datasources/DDLParser.scala   |  185 +++
 .../execution/datasources/DefaultSource.scala   |   64 +
 .../datasources/InsertIntoDataSource.scala  |   23 +-
 .../datasources/ResolvedDataSource.scala|  204 +++
 .../spark/sql/execution/datasources/ddl.scala   |  352 +-
 .../datasources/jdbc/DefaultSource.scala|   62 +
 .../datasources/jdbc/DriverRegistry.scala   |   60 +
 .../datasources/jdbc/DriverWrapper.scala|   48 +
 .../execution/datasources/jdbc/JDBCRDD.scala|  489 
 .../datasources/jdbc/JDBCRelation.scala |  113 ++
 .../execution/datasources/jdbc/JdbcUtils.scala  |  219 
 .../datasources/json/InferSchema.scala  |  207 
 .../datasources/json/JSONRelation.scala |  204 +++
 .../datasources/json/JacksonGenerator.scala |  135 ++
 .../datasources/json/JacksonParser.scala|  228 
 .../datasources/json/JacksonUtils.scala |   32 +
 .../parquet/CatalystReadSupport.scala   |  153 +++
 .../parquet/CatalystRecordMaterializer.scala|   41 +
 .../parquet/CatalystRowConverter.scala  |  449 +++
 .../parquet/CatalystSchemaConverter.scala   |  592 +
 .../parquet/DirectParquetOutputCommitter.scala  |   87 ++
 .../datasources/parquet/ParquetConverter.scala  |   39 +
 .../datasources/parquet/ParquetFilters.scala|  360 ++
 .../datasources/parquet/ParquetRelation.scala   |  796 
 .../parquet/ParquetTableSupport.scala   |  322 +
 .../parquet/ParquetTypesConverter.scala |  159 +++
 .../spark/sql/execution/metric/SQLMetrics.scala |  115 ++
 .../apache/spark/sql/execution/package.scala|8 +-
 .../sql/execution/ui/AllExecutionsPage.scala|  238 
 .../spark/sql/execution/ui/ExecutionPage.scala  |  127 ++
 .../spark/sql/execution/ui/SQLListener.scala|  352 ++
 .../apache/spark/sql/execution/ui/SQLTab.scala  |   49 +
 .../spark/sql/execution/ui/SparkPlanGraph.scala |  118 ++
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala |  490 
 .../apache/spark/sql/jdbc/JDBCRelation.scala|  152 ---
 .../org/apache/spark/sql/jdbc/JdbcUtils.scala   |   52 -
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  |  250 
 .../org/apache/spark/sql/json/InferSchema.scala |  207 
 .../apache/spark/sql/json/JSONRelation.scala|  203 ---
 .../spark/sql/json/JacksonGenerator.scala   |  135 --
 .../apache/spark/sql/json/JacksonParser.scala   |  228 
 .../apache/spark/sql/json/JacksonUtils.scala|   32 -
 .../apache/spark/sql/metric/SQLMetrics.scala 

[09/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
deleted file mode 100644
index 975fec1..000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/CatalystReadSupport.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.parquet
-
-import java.util.{Map = JMap}
-
-import scala.collection.JavaConversions.{iterableAsScalaIterable, 
mapAsJavaMap, mapAsScalaMap}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
-import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
-import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
-
-private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
-  override def prepareForRead(
-  conf: Configuration,
-  keyValueMetaData: JMap[String, String],
-  fileSchema: MessageType,
-  readContext: ReadContext): RecordMaterializer[InternalRow] = {
-log.debug(sPreparing for read Parquet file with message type: 
$fileSchema)
-
-val toCatalyst = new CatalystSchemaConverter(conf)
-val parquetRequestedSchema = readContext.getRequestedSchema
-
-val catalystRequestedSchema =
-  Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { 
metadata =
-metadata
-  // First tries to read requested schema, which may result from 
projections
-  .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
-  // If not available, tries to read Catalyst schema from file 
metadata.  It's only
-  // available if the target file is written by Spark SQL.
-  .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
-  }.map(StructType.fromString).getOrElse {
-logDebug(Catalyst schema not available, falling back to Parquet 
schema)
-toCatalyst.convert(parquetRequestedSchema)
-  }
-
-logDebug(sCatalyst schema used to read Parquet files: 
$catalystRequestedSchema)
-new CatalystRecordMaterializer(parquetRequestedSchema, 
catalystRequestedSchema)
-  }
-
-  override def init(context: InitContext): ReadContext = {
-val conf = context.getConfiguration
-
-// If the target file was written by Spark SQL, we should be able to find 
a serialized Catalyst
-// schema of this file from its the metadata.
-val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
-
-// Optional schema of requested columns, in the form of a string 
serialized from a Catalyst
-// `StructType` containing all requested columns.
-val maybeRequestedSchema = 
Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
-
-// Below we construct a Parquet schema containing all requested columns.  
This schema tells
-// Parquet which columns to read.
-//
-// If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet 
schema.  Otherwise,
-// we have to fallback to the full file schema which contains all columns 
in the file.
-// Obviously this may waste IO bandwidth since it may read more columns 
than requested.
-//
-// Two things to note:
-//
-// 1. It's possible that some requested columns don't exist in the target 
Parquet file.  For
-//example, in the case of schema merging, the globally merged schema 
may contain extra
-//columns gathered from other Parquet files.  These columns will be 
simply filled with nulls
-//when actually reading the target Parquet file.
-//
-// 2. When `maybeRequestedSchema` is available, we can't simply convert 
the Catalyst schema to
-//Parquet schema using `CatalystSchemaConverter`, because the mapping 
is not unique due to
-//

[12/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
new file mode 100644
index 000..3542dfb
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.math.{BigDecimal, BigInteger}
+import java.nio.ByteOrder
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.parquet.column.Dictionary
+import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
+import org.apache.parquet.schema.Type.Repetition
+import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ParentContainerUpdater]] is used by a Parquet converter to set 
converted values to some
+ * corresponding parent container. For example, a converter for a `StructType` 
field may set
+ * converted values to a [[MutableRow]]; or a converter for array elements may 
append converted
+ * values to an [[ArrayBuffer]].
+ */
+private[parquet] trait ParentContainerUpdater {
+  def set(value: Any): Unit = ()
+  def setBoolean(value: Boolean): Unit = set(value)
+  def setByte(value: Byte): Unit = set(value)
+  def setShort(value: Short): Unit = set(value)
+  def setInt(value: Int): Unit = set(value)
+  def setLong(value: Long): Unit = set(value)
+  def setFloat(value: Float): Unit = set(value)
+  def setDouble(value: Double): Unit = set(value)
+}
+
+/** A no-op updater used for root converter (who doesn't have a parent). */
+private[parquet] object NoopUpdater extends ParentContainerUpdater
+
+/**
+ * A [[CatalystRowConverter]] is used to convert Parquet structs into Spark 
SQL [[InternalRow]]s.
+ * Since any Parquet record is also a struct, this converter can also be used 
as root converter.
+ *
+ * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
+ * any parent container.
+ *
+ * @param parquetType Parquet schema of Parquet records
+ * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type
+ * @param updater An updater which propagates converted field values to the 
parent container
+ */
+private[parquet] class CatalystRowConverter(
+parquetType: GroupType,
+catalystType: StructType,
+updater: ParentContainerUpdater)
+  extends GroupConverter {
+
+  /**
+   * Updater used together with field converters within a 
[[CatalystRowConverter]].  It propagates
+   * converted filed values to the `ordinal`-th cell in `currentRow`.
+   */
+  private final class RowUpdater(row: MutableRow, ordinal: Int) extends 
ParentContainerUpdater {
+override def set(value: Any): Unit = row(ordinal) = value
+override def setBoolean(value: Boolean): Unit = row.setBoolean(ordinal, 
value)
+override def setByte(value: Byte): Unit = row.setByte(ordinal, value)
+override def setShort(value: Short): Unit = row.setShort(ordinal, value)
+override def setInt(value: Int): Unit = row.setInt(ordinal, value)
+override def setLong(value: Long): Unit = row.setLong(ordinal, value)
+override def setDouble(value: Double): Unit = row.setDouble(ordinal, value)
+override def setFloat(value: Float): Unit = row.setFloat(ordinal, value)
+  }
+
+  /**
+   * Represents the converted row object once an entire Parquet record is 
converted.
+   *
+   * @todo Uses [[UnsafeRow]] for better performance.
+   */
+  val currentRow = new 

[03/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
deleted file mode 100644
index 92022ff..000
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ /dev/null
@@ -1,1172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.json
-
-import java.io.{File, StringWriter}
-import java.sql.{Date, Timestamp}
-
-import com.fasterxml.jackson.core.JsonFactory
-import org.apache.spark.rdd.RDD
-import org.scalactic.Tolerance._
-
-import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf}
-import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, 
LogicalRelation}
-import org.apache.spark.sql.json.InferSchema.compatibleType
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.Utils
-
-class JsonSuite extends QueryTest with SQLTestUtils with TestJsonData {
-
-  protected lazy val ctx = org.apache.spark.sql.test.TestSQLContext
-  override def sqlContext: SQLContext = ctx // used by SQLTestUtils
-
-  import ctx.sql
-  import ctx.implicits._
-
-  test(Type promotion) {
-def checkTypePromotion(expected: Any, actual: Any) {
-  assert(expected.getClass == actual.getClass,
-sFailed to promote ${actual.getClass} to ${expected.getClass}.)
-  assert(expected == actual,
-sPromoted value ${actual}(${actual.getClass}) does not equal the 
expected value  +
-  s${expected}(${expected.getClass}).)
-}
-
-val factory = new JsonFactory()
-def enforceCorrectType(value: Any, dataType: DataType): Any = {
-  val writer = new StringWriter()
-  val generator = factory.createGenerator(writer)
-  generator.writeObject(value)
-  generator.flush()
-
-  val parser = factory.createParser(writer.toString)
-  parser.nextToken()
-  JacksonParser.convertField(factory, parser, dataType)
-}
-
-val intNumber: Int = 2147483647
-checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType))
-checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, 
LongType))
-checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, 
DoubleType))
-checkTypePromotion(
-  Decimal(intNumber), enforceCorrectType(intNumber, 
DecimalType.SYSTEM_DEFAULT))
-
-val longNumber: Long = 9223372036854775807L
-checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType))
-checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, 
DoubleType))
-checkTypePromotion(
-  Decimal(longNumber), enforceCorrectType(longNumber, 
DecimalType.SYSTEM_DEFAULT))
-
-val doubleNumber: Double = 1.7976931348623157E308d
-checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, 
DoubleType))
-
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(intNumber)),
-enforceCorrectType(intNumber, TimestampType))
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(intNumber.toLong)),
-enforceCorrectType(intNumber.toLong, TimestampType))
-val strTime = 2014-09-30 12:34:56
-
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
-enforceCorrectType(strTime, TimestampType))
-
-val strDate = 2014-10-15
-checkTypePromotion(
-  DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), 
enforceCorrectType(strDate, DateType))
-
-val ISO8601Time1 = 1970-01-01T01:00:01.0Z
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
-enforceCorrectType(ISO8601Time1, TimestampType))
-checkTypePromotion(DateTimeUtils.millisToDays(3601000),
-  enforceCorrectType(ISO8601Time1, DateType))
-val ISO8601Time2 = 1970-01-01T02:00:01-01:00
-checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(10801000)),
-  

[01/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 d251d9ff0 - c1838e430


http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index 1a4d41b..392da0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -20,9 +20,37 @@ package org.apache.spark.sql.sources
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
+
+// please note that the META-INF/services had to be modified for the test 
directory for this to work
+class DDLSourceLoadSuite extends DataSourceTest {
+
+  test(data sources with the same name) {
+intercept[RuntimeException] {
+  caseInsensitiveContext.read.format(Fluet da Bomb).load()
+}
+  }
+
+  test(load data source from format alias) {
+caseInsensitiveContext.read.format(gathering quorum).load().schema ==
+  StructType(Seq(StructField(stringType, StringType, nullable = false)))
+  }
+
+  test(specify full classname with duplicate formats) {
+
caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne)
+  .load().schema == StructType(Seq(StructField(stringType, StringType, 
nullable = false)))
+  }
+
+  test(should fail to load ORC without HiveContext) {
+intercept[ClassNotFoundException] {
+  caseInsensitiveContext.read.format(orc).load()
+}
+  }
+}
+
+
 class FakeSourceOne extends RelationProvider with DataSourceRegister {
 
-  def format(): String = Fluet da Bomb
+  def shortName(): String = Fluet da Bomb
 
   override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
 new BaseRelation {
@@ -35,7 +63,7 @@ class FakeSourceOne extends RelationProvider with 
DataSourceRegister {
 
 class FakeSourceTwo extends RelationProvider  with DataSourceRegister {
 
-  def format(): String = Fluet da Bomb
+  def shortName(): String = Fluet da Bomb
 
   override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
 new BaseRelation {
@@ -48,7 +76,7 @@ class FakeSourceTwo extends RelationProvider  with 
DataSourceRegister {
 
 class FakeSourceThree extends RelationProvider with DataSourceRegister {
 
-  def format(): String = gathering quorum
+  def shortName(): String = gathering quorum
 
   override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
 new BaseRelation {
@@ -58,28 +86,3 @@ class FakeSourceThree extends RelationProvider with 
DataSourceRegister {
 StructType(Seq(StructField(stringType, StringType, nullable = 
false)))
 }
 }
-// please note that the META-INF/services had to be modified for the test 
directory for this to work
-class DDLSourceLoadSuite extends DataSourceTest {
-
-  test(data sources with the same name) {
-intercept[RuntimeException] {
-  caseInsensitiveContext.read.format(Fluet da Bomb).load()
-}
-  }
-
-  test(load data source from format alias) {
-caseInsensitiveContext.read.format(gathering quorum).load().schema ==
-  StructType(Seq(StructField(stringType, StringType, nullable = false)))
-  }
-
-  test(specify full classname with duplicate formats) {
-
caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne)
-  .load().schema == StructType(Seq(StructField(stringType, StringType, 
nullable = false)))
-  }
-
-  test(Loading Orc) {
-intercept[ClassNotFoundException] {
-  caseInsensitiveContext.read.format(orc).load()
-}
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 3cbf546..27d1cd9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -22,14 +22,39 @@ import 
org.apache.spark.sql.execution.datasources.ResolvedDataSource
 
 class ResolvedDataSourceSuite extends SparkFunSuite {
 
-  test(builtin sources) {
-assert(ResolvedDataSource.lookupDataSource(jdbc) ===
-  classOf[org.apache.spark.sql.jdbc.DefaultSource])
+  test(jdbc) {
+assert(
+  ResolvedDataSource.lookupDataSource(jdbc) ===
+  classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+assert(
+  

[06/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
--
diff --git 
a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
 
b/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
deleted file mode 100644
index 354c9d7..000
--- 
a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
+++ /dev/null
@@ -1,1001 +0,0 @@
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.spark.sql.parquet.test.avro;  
-@SuppressWarnings(all)
-@org.apache.avro.specific.AvroGenerated
-public class ParquetAvroCompat extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse({\type\:\record\,\name\:\ParquetAvroCompat\,\namespace\:\org.apache.spark.sql.parquet.test.avro\,\fields\:[{\name\:\bool_column\,\type\:\boolean\},{\name\:\int_column\,\type\:\int\},{\name\:\long_column\,\type\:\long\},{\name\:\float_column\,\type\:\float\},{\name\:\double_column\,\type\:\double\},{\name\:\binary_column\,\type\:\bytes\},{\name\:\string_column\,\type\:{\type\:\string\,\avro.java.string\:\String\}},{\name\:\maybe_bool_column\,\type\:[\null\,\boolean\]},{\name\:\maybe_int_column\,\type\:[\null\,\int\]},{\name\:\maybe_long_column\,\type\:[\null\,\long\]},{\name\:\maybe_float_column\,\type\:[\null\,\float\]},{\name\:\maybe_double_column\,\type\:[\null\,\double\]},{\name\:\maybe_binary_column\,\type\:[\null\,\bytes\]},{\name\:\maybe_string
 
_column\,\type\:[\null\,{\type\:\string\,\avro.java.string\:\String\}]},{\name\:\strings_column\,\type\:{\type\:\array\,\items\:{\type\:\string\,\avro.java.string\:\String\}}},{\name\:\string_to_int_column\,\type\:{\type\:\map\,\values\:\int\,\avro.java.string\:\String\}},{\name\:\complex_column\,\type\:{\type\:\map\,\values\:{\type\:\array\,\items\:{\type\:\record\,\name\:\Nested\,\fields\:[{\name\:\nested_ints_column\,\type\:{\type\:\array\,\items\:\int\}},{\name\:\nested_string_column\,\type\:{\type\:\string\,\avro.java.string\:\String\}}]}},\avro.java.string\:\String\}}]});
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public boolean bool_column;
-  @Deprecated public int int_column;
-  @Deprecated public long long_column;
-  @Deprecated public float float_column;
-  @Deprecated public double double_column;
-  @Deprecated public java.nio.ByteBuffer binary_column;
-  @Deprecated public java.lang.String string_column;
-  @Deprecated public java.lang.Boolean maybe_bool_column;
-  @Deprecated public java.lang.Integer maybe_int_column;
-  @Deprecated public java.lang.Long maybe_long_column;
-  @Deprecated public java.lang.Float maybe_float_column;
-  @Deprecated public java.lang.Double maybe_double_column;
-  @Deprecated public java.nio.ByteBuffer maybe_binary_column;
-  @Deprecated public java.lang.String maybe_string_column;
-  @Deprecated public java.util.Listjava.lang.String strings_column;
-  @Deprecated public java.util.Mapjava.lang.String,java.lang.Integer 
string_to_int_column;
-  @Deprecated public 
java.util.Mapjava.lang.String,java.util.Listorg.apache.spark.sql.parquet.test.avro.Nested
 complex_column;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use codenewBuilder()/code. 
-   */
-  public ParquetAvroCompat() {}
-
-  /**
-   * All-args constructor.
-   */
-  public ParquetAvroCompat(java.lang.Boolean bool_column, java.lang.Integer 
int_column, java.lang.Long long_column, java.lang.Float float_column, 
java.lang.Double double_column, java.nio.ByteBuffer binary_column, 
java.lang.String string_column, java.lang.Boolean maybe_bool_column, 
java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, 
java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, 
java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, 
java.util.Listjava.lang.String strings_column, 
java.util.Mapjava.lang.String,java.lang.Integer string_to_int_column, 
java.util.Mapjava.lang.String,java.util.Listorg.apache.spark.sql.parquet.test.avro.Nested
 complex_column) {
-this.bool_column = bool_column;
-this.int_column = int_column;
-this.long_column = long_column;
-this.float_column = float_column;
-this.double_column = double_column;
-this.binary_column = binary_column;
-this.string_column = string_column;
-this.maybe_bool_column = maybe_bool_column;
-this.maybe_int_column = maybe_int_column;
-this.maybe_long_column = maybe_long_column;
-this.maybe_float_column = maybe_float_column;
-this.maybe_double_column = 

[13/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
new file mode 100644
index 000..8eab6a0
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, 
SQLException}
+import java.util.Properties
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.jdbc.JdbcDialects
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+
+/**
+ * Data corresponding to one partition of a JDBCRDD.
+ */
+private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends 
Partition {
+  override def index: Int = idx
+}
+
+
+private[sql] object JDBCRDD extends Logging {
+
+  /**
+   * Maps a JDBC type to a Catalyst type.  This function is called only when
+   * the JdbcDialect class corresponding to your database driver returns null.
+   *
+   * @param sqlType - A field of java.sql.Types
+   * @return The Catalyst type corresponding to sqlType.
+   */
+  private def getCatalystType(
+  sqlType: Int,
+  precision: Int,
+  scale: Int,
+  signed: Boolean): DataType = {
+val answer = sqlType match {
+  // scalastyle:off
+  case java.sql.Types.ARRAY = null
+  case java.sql.Types.BIGINT= if (signed) { LongType } else { 
DecimalType(20,0) }
+  case java.sql.Types.BINARY= BinaryType
+  case java.sql.Types.BIT   = BooleanType // @see JdbcDialect for 
quirks
+  case java.sql.Types.BLOB  = BinaryType
+  case java.sql.Types.BOOLEAN   = BooleanType
+  case java.sql.Types.CHAR  = StringType
+  case java.sql.Types.CLOB  = StringType
+  case java.sql.Types.DATALINK  = null
+  case java.sql.Types.DATE  = DateType
+  case java.sql.Types.DECIMAL
+if precision != 0 || scale != 0 = DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.DECIMAL   = DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.DISTINCT  = null
+  case java.sql.Types.DOUBLE= DoubleType
+  case java.sql.Types.FLOAT = FloatType
+  case java.sql.Types.INTEGER   = if (signed) { IntegerType } else { 
LongType }
+  case java.sql.Types.JAVA_OBJECT   = null
+  case java.sql.Types.LONGNVARCHAR  = StringType
+  case java.sql.Types.LONGVARBINARY = BinaryType
+  case java.sql.Types.LONGVARCHAR   = StringType
+  case java.sql.Types.NCHAR = StringType
+  case java.sql.Types.NCLOB = StringType
+  case java.sql.Types.NULL  = null
+  case java.sql.Types.NUMERIC
+if precision != 0 || scale != 0 = DecimalType.bounded(precision, 
scale)
+  case java.sql.Types.NUMERIC   = DecimalType.SYSTEM_DEFAULT
+  case java.sql.Types.NVARCHAR  = StringType
+  case java.sql.Types.OTHER = null
+  case java.sql.Types.REAL  = DoubleType
+  case java.sql.Types.REF   = StringType
+  case java.sql.Types.ROWID = LongType
+  case java.sql.Types.SMALLINT  = IntegerType
+  case java.sql.Types.SQLXML= StringType
+  case java.sql.Types.STRUCT= StringType
+  case java.sql.Types.TIME  = TimestampType
+  case java.sql.Types.TIMESTAMP = TimestampType
+  case java.sql.Types.TINYINT   = IntegerType
+  case 

[05/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/c1838e43/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
new file mode 100644
index 000..6b62c9a
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
+
+trait TestJsonData {
+
+  protected def ctx: SQLContext
+
+  def primitiveFieldAndType: RDD[String] =
+ctx.sparkContext.parallelize(
+  {string:this is a simple string.,
+  integer:10,
+  long:21474836470,
+  bigInteger:92233720368547758070,
+  double:1.7976931348623157E308,
+  boolean:true,
+  null:null
+  }  :: Nil)
+
+  def primitiveFieldValueTypeConflict: RDD[String] =
+ctx.sparkContext.parallelize(
+  {num_num_1:11, num_num_2:null, num_num_3: 1.1,
+  num_bool:true, num_str:13.1, str_bool:str1} ::
+  {num_num_1:null, num_num_2:21474836470.9, num_num_3: null,
+  num_bool:12, num_str:null, str_bool:true} ::
+  {num_num_1:21474836470, num_num_2:92233720368547758070, 
num_num_3: 100,
+  num_bool:false, num_str:str1, str_bool:false} ::
+  {num_num_1:21474836570, num_num_2:1.1, num_num_3: 21474836470,
+  num_bool:null, num_str:92233720368547758070, str_bool:null} 
:: Nil)
+
+  def jsonNullStruct: RDD[String] =
+ctx.sparkContext.parallelize(
+  
{nullstr:,ip:27.31.100.29,headers:{Host:1.abc.com,Charset:UTF-8}}
 ::
+{nullstr:,ip:27.31.100.29,headers:{}} ::
+{nullstr:,ip:27.31.100.29,headers:} ::
+{nullstr:null,ip:27.31.100.29,headers:null} :: Nil)
+
+  def complexFieldValueTypeConflict: RDD[String] =
+ctx.sparkContext.parallelize(
+  {num_struct:11, str_array:[1, 2, 3],
+  array:[], struct_array:[], struct: {}} ::
+  {num_struct:{field:false}, str_array:null,
+  array:null, struct_array:{}, struct: null} ::
+  {num_struct:null, str_array:str,
+  array:[4, 5, 6], struct_array:[7, 8, 9], struct: 
{field:null}} ::
+  {num_struct:{}, str_array:[str1, str2, 33],
+  array:[7], struct_array:{field: true}, struct: {field: 
str}} :: Nil)
+
+  def arrayElementTypeConflict: RDD[String] =
+ctx.sparkContext.parallelize(
+  {array1: [1, 1.1, true, null, [], {}, [2,3,4], {field:str}],
+  array2: [{field:214748364700}, {field:1}]} ::
+  {array3: [{field:str}, {field:1}]} ::
+  {array3: [1, 2, 3]} :: Nil)
+
+  def missingFields: RDD[String] =
+ctx.sparkContext.parallelize(
+  {a:true} ::
+  {b:21474836470} ::
+  {c:[33, 44]} ::
+  {d:{field:true}} ::
+  {e:str} :: Nil)
+
+  def complexFieldAndType1: RDD[String] =
+ctx.sparkContext.parallelize(
+  {struct:{field1: true, field2: 92233720368547758070},
+  structWithArrayFields:{field1:[4, 5, 6], field2:[str1, 
str2]},
+  arrayOfString:[str1, str2],
+  arrayOfInteger:[1, 2147483647, -2147483648],
+  arrayOfLong:[21474836470, 9223372036854775807, 
-9223372036854775808],
+  arrayOfBigInteger:[922337203685477580700, -922337203685477580800],
+  arrayOfDouble:[1.2, 1.7976931348623157E308, 4.9E-324, 
2.2250738585072014E-308],
+  arrayOfBoolean:[true, false, true],
+  arrayOfNull:[null, null, null, null],
+  arrayOfStruct:[{field1: true, field2: str1}, {field1: 
false}, {field3: null}],
+  arrayOfArray1:[[1, 2, 3], [str1, str2]],
+  arrayOfArray2:[[1, 2, 3], [1.1, 2.1, 3.1]]
+ }  :: Nil)
+
+  def complexFieldAndType2: RDD[String] =
+ctx.sparkContext.parallelize(
+  {arrayOfStruct:[{field1: true, field2: str1}, {field1: 
false}, {field3: null}],
+  complexArrayOfStruct: [
+  {
+field1: [
+

[07/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala
deleted file mode 100644
index 1ba50b9..000
--- a/sql/core/src/main/scala/org/apache/spark/sql/ui/SparkPlanGraph.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.ui
-
-import java.util.concurrent.atomic.AtomicLong
-
-import scala.collection.mutable
-
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue}
-
-/**
- * A graph used for storing information of an executionPlan of DataFrame.
- *
- * Each graph is defined with a set of nodes and a set of edges. Each node 
represents a node in the
- * SparkPlan tree, and each edge represents a parent-child relationship 
between two nodes.
- */
-private[ui] case class SparkPlanGraph(
-nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) {
-
-  def makeDotFile(metrics: Map[Long, Any]): String = {
-val dotFile = new StringBuilder
-dotFile.append(digraph G {\n)
-nodes.foreach(node = dotFile.append(node.makeDotNode(metrics) + \n))
-edges.foreach(edge = dotFile.append(edge.makeDotEdge + \n))
-dotFile.append(})
-dotFile.toString()
-  }
-}
-
-private[sql] object SparkPlanGraph {
-
-  /**
-   * Build a SparkPlanGraph from the root of a SparkPlan tree.
-   */
-  def apply(plan: SparkPlan): SparkPlanGraph = {
-val nodeIdGenerator = new AtomicLong(0)
-val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
-val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
-buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
-new SparkPlanGraph(nodes, edges)
-  }
-
-  private def buildSparkPlanGraphNode(
-  plan: SparkPlan,
-  nodeIdGenerator: AtomicLong,
-  nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
-  edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
-val metrics = plan.metrics.toSeq.map { case (key, metric) =
-  SQLPlanMetric(metric.name.getOrElse(key), metric.id,
-metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
-}
-val node = SparkPlanGraphNode(
-  nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, 
metrics)
-nodes += node
-val childrenNodes = plan.children.map(
-  child = buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
-for (child - childrenNodes) {
-  edges += SparkPlanGraphEdge(child.id, node.id)
-}
-node
-  }
-}
-
-/**
- * Represent a node in the SparkPlan tree, along with its metrics.
- *
- * @param id generated by SparkPlanGraph. There is no duplicate id in a graph
- * @param name the name of this SparkPlan node
- * @param metrics metrics that this SparkPlan node will track
- */
-private[ui] case class SparkPlanGraphNode(
-id: Long, name: String, desc: String, metrics: Seq[SQLPlanMetric]) {
-
-  def makeDotNode(metricsValue: Map[Long, Any]): String = {
-val values = {
-  for (metric - metrics;
-   value - metricsValue.get(metric.accumulatorId)) yield {
-metric.name + :  + value
-  }
-}
-val label = if (values.isEmpty) {
-name
-  } else {
-// If there are metrics, display all metrics in a separate line. We 
should use an escaped
-// \n here to follow the dot syntax.
-//
-// Note: whitespace between two \ns is to create an empty line 
between the name of
-// SparkPlan and metrics. If removing it, it won't display the empty 
line in UI.
-name + \\n \\n + values.mkString(\\n)
-  }
-s  $id [label=$label];
-  }
-}
-
-/**
- * Represent an edge in the SparkPlan tree. `fromId` is the parent node id, 
and `toId` is the child
- * node id.
- */
-private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
-
-  def makeDotEdge: String = s  $fromId-$toId;\n
-}


[01/14] spark git commit: [SPARK-9763][SQL] Minimize exposure of internal SQL classes.

2015-08-10 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 0fe66744f - 40ed2af58


http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index 1a4d41b..392da0b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -20,9 +20,37 @@ package org.apache.spark.sql.sources
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
+
+// please note that the META-INF/services had to be modified for the test 
directory for this to work
+class DDLSourceLoadSuite extends DataSourceTest {
+
+  test(data sources with the same name) {
+intercept[RuntimeException] {
+  caseInsensitiveContext.read.format(Fluet da Bomb).load()
+}
+  }
+
+  test(load data source from format alias) {
+caseInsensitiveContext.read.format(gathering quorum).load().schema ==
+  StructType(Seq(StructField(stringType, StringType, nullable = false)))
+  }
+
+  test(specify full classname with duplicate formats) {
+
caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne)
+  .load().schema == StructType(Seq(StructField(stringType, StringType, 
nullable = false)))
+  }
+
+  test(should fail to load ORC without HiveContext) {
+intercept[ClassNotFoundException] {
+  caseInsensitiveContext.read.format(orc).load()
+}
+  }
+}
+
+
 class FakeSourceOne extends RelationProvider with DataSourceRegister {
 
-  def format(): String = Fluet da Bomb
+  def shortName(): String = Fluet da Bomb
 
   override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
 new BaseRelation {
@@ -35,7 +63,7 @@ class FakeSourceOne extends RelationProvider with 
DataSourceRegister {
 
 class FakeSourceTwo extends RelationProvider  with DataSourceRegister {
 
-  def format(): String = Fluet da Bomb
+  def shortName(): String = Fluet da Bomb
 
   override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
 new BaseRelation {
@@ -48,7 +76,7 @@ class FakeSourceTwo extends RelationProvider  with 
DataSourceRegister {
 
 class FakeSourceThree extends RelationProvider with DataSourceRegister {
 
-  def format(): String = gathering quorum
+  def shortName(): String = gathering quorum
 
   override def createRelation(cont: SQLContext, param: Map[String, String]): 
BaseRelation =
 new BaseRelation {
@@ -58,28 +86,3 @@ class FakeSourceThree extends RelationProvider with 
DataSourceRegister {
 StructType(Seq(StructField(stringType, StringType, nullable = 
false)))
 }
 }
-// please note that the META-INF/services had to be modified for the test 
directory for this to work
-class DDLSourceLoadSuite extends DataSourceTest {
-
-  test(data sources with the same name) {
-intercept[RuntimeException] {
-  caseInsensitiveContext.read.format(Fluet da Bomb).load()
-}
-  }
-
-  test(load data source from format alias) {
-caseInsensitiveContext.read.format(gathering quorum).load().schema ==
-  StructType(Seq(StructField(stringType, StringType, nullable = false)))
-  }
-
-  test(specify full classname with duplicate formats) {
-
caseInsensitiveContext.read.format(org.apache.spark.sql.sources.FakeSourceOne)
-  .load().schema == StructType(Seq(StructField(stringType, StringType, 
nullable = false)))
-  }
-
-  test(Loading Orc) {
-intercept[ClassNotFoundException] {
-  caseInsensitiveContext.read.format(orc).load()
-}
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 3cbf546..27d1cd9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -22,14 +22,39 @@ import 
org.apache.spark.sql.execution.datasources.ResolvedDataSource
 
 class ResolvedDataSourceSuite extends SparkFunSuite {
 
-  test(builtin sources) {
-assert(ResolvedDataSource.lookupDataSource(jdbc) ===
-  classOf[org.apache.spark.sql.jdbc.DefaultSource])
+  test(jdbc) {
+assert(
+  ResolvedDataSource.lookupDataSource(jdbc) ===
+  classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
+assert(
+