spark git commit: [MINOR] Add 1.3, 1.3.1 to master branch EC2 scripts

2015-05-17 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/master ba4f8ca0d -> 1a7b9ce80


[MINOR] Add 1.3, 1.3.1 to master branch EC2 scripts

cc pwendell

P.S: I can't believe this was outdated all along ?

Author: Shivaram Venkataraman 

Closes #6215 from shivaram/update-ec2-map and squashes the following commits:

ae3937a [Shivaram Venkataraman] Add 1.3, 1.3.1 to master branch EC2 scripts


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a7b9ce8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a7b9ce8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a7b9ce8

Branch: refs/heads/master
Commit: 1a7b9ce80bb5649796dda48d6a6d662a2809d0ef
Parents: ba4f8ca
Author: Shivaram Venkataraman 
Authored: Sun May 17 00:12:20 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Sun May 17 00:12:20 2015 -0700

--
 ec2/spark_ec2.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1a7b9ce8/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index ab4a96f..be92d5f 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -48,7 +48,7 @@ else:
 from urllib.request import urlopen, Request
 from urllib.error import HTTPError
 
-SPARK_EC2_VERSION = "1.2.1"
+SPARK_EC2_VERSION = "1.3.1"
 SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
 
 VALID_SPARK_VERSIONS = set([
@@ -65,6 +65,8 @@ VALID_SPARK_VERSIONS = set([
 "1.1.1",
 "1.2.0",
 "1.2.1",
+"1.3.0",
+"1.3.1",
 ])
 
 SPARK_TACHYON_MAP = {
@@ -75,6 +77,8 @@ SPARK_TACHYON_MAP = {
 "1.1.1": "0.5.0",
 "1.2.0": "0.5.0",
 "1.2.1": "0.5.0",
+"1.3.0": "0.5.0",
+"1.3.1": "0.5.0",
 }
 
 DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] Add 1.3, 1.3.1 to master branch EC2 scripts

2015-05-17 Thread shivaram
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 671a6bca5 -> 0ed376afa


[MINOR] Add 1.3, 1.3.1 to master branch EC2 scripts

cc pwendell

P.S: I can't believe this was outdated all along ?

Author: Shivaram Venkataraman 

Closes #6215 from shivaram/update-ec2-map and squashes the following commits:

ae3937a [Shivaram Venkataraman] Add 1.3, 1.3.1 to master branch EC2 scripts

(cherry picked from commit 1a7b9ce80bb5649796dda48d6a6d662a2809d0ef)
Signed-off-by: Shivaram Venkataraman 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ed376af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ed376af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ed376af

Branch: refs/heads/branch-1.4
Commit: 0ed376afad603b7afd86bb8eb312cad6edae2b9c
Parents: 671a6bc
Author: Shivaram Venkataraman 
Authored: Sun May 17 00:12:20 2015 -0700
Committer: Shivaram Venkataraman 
Committed: Sun May 17 00:12:46 2015 -0700

--
 ec2/spark_ec2.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0ed376af/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index ab4a96f..be92d5f 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -48,7 +48,7 @@ else:
 from urllib.request import urlopen, Request
 from urllib.error import HTTPError
 
-SPARK_EC2_VERSION = "1.2.1"
+SPARK_EC2_VERSION = "1.3.1"
 SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
 
 VALID_SPARK_VERSIONS = set([
@@ -65,6 +65,8 @@ VALID_SPARK_VERSIONS = set([
 "1.1.1",
 "1.2.0",
 "1.2.1",
+"1.3.0",
+"1.3.1",
 ])
 
 SPARK_TACHYON_MAP = {
@@ -75,6 +77,8 @@ SPARK_TACHYON_MAP = {
 "1.1.1": "0.5.0",
 "1.2.0": "0.5.0",
 "1.2.1": "0.5.0",
+"1.3.0": "0.5.0",
+"1.3.1": "0.5.0",
 }
 
 DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL] [MINOR] Skip unresolved expression for InConversion

2015-05-17 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 1a7b9ce80 -> edf09ea1b


[SQL] [MINOR] Skip unresolved expression for InConversion

Author: scwf 

Closes #6145 from scwf/InConversion and squashes the following commits:

5c8ac6b [scwf] minir fix for InConversion


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/edf09ea1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/edf09ea1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/edf09ea1

Branch: refs/heads/master
Commit: edf09ea1bd4bf7692e0085ad9c70cb1bfc8d06d8
Parents: 1a7b9ce
Author: scwf 
Authored: Sun May 17 15:17:11 2015 +0800
Committer: Cheng Lian 
Committed: Sun May 17 15:17:11 2015 +0800

--
 .../org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/edf09ea1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index fe0d3f2..b45b17d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -296,6 +296,9 @@ trait HiveTypeCoercion {
*/
   object InConversion extends Rule[LogicalPlan] {
 def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
+  // Skip nodes who's children have not been resolved yet.
+  case e if !e.childrenResolved => e 
+  
   case i @ In(a, b) if b.exists(_.dataType != a.dataType) =>
 i.makeCopy(Array(a, b.map(Cast(_, a.dataType
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7447] [SQL] Don't re-merge Parquet schema when the relation is deserialized

2015-05-17 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master edf09ea1b -> 339905578


[SPARK-7447] [SQL] Don't re-merge Parquet schema when the relation is 
deserialized

JIRA: https://issues.apache.org/jira/browse/SPARK-7447

`MetadataCache` in `ParquetRelation2` is annotated as `transient`. When 
`ParquetRelation2` is deserialized, we ask `MetadataCache` to refresh and 
perform schema merging again. It is time-consuming especially for very many 
parquet files.

With the new `FSBasedParquetRelation`, although `MetadataCache` is not 
`transient` now, `MetadataCache.refresh()` still performs schema merging again 
when the relation is deserialized.

Author: Liang-Chi Hsieh 

Closes #6012 from viirya/without_remerge_schema and squashes the following 
commits:

2663957 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
without_remerge_schema
6ac7d93 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
without_remerge_schema
b0fc09b [Liang-Chi Hsieh] Don't generate and merge parquetSchema multiple times.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33990557
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33990557
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33990557

Branch: refs/heads/master
Commit: 339905578790fa37fcad9684b859b443313a5aa2
Parents: edf09ea
Author: Liang-Chi Hsieh 
Authored: Sun May 17 15:42:21 2015 +0800
Committer: Cheng Lian 
Committed: Sun May 17 15:42:21 2015 +0800

--
 .../apache/spark/sql/parquet/newParquet.scala   | 32 +++-
 1 file changed, 18 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/33990557/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 946062f..bcbdb1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -340,7 +340,7 @@ private[sql] class ParquetRelation2(
 
 // Schema of the actual Parquet files, without partition columns 
discovered from partition
 // directory paths.
-var dataSchema: StructType = _
+var dataSchema: StructType = null
 
 // Schema of the whole table, including partition columns.
 var schema: StructType = _
@@ -379,19 +379,23 @@ private[sql] class ParquetRelation2(
 f -> new Footer(f.getPath, parquetMetadata)
   }.seq.toMap
 
-  dataSchema = {
-val dataSchema0 =
-  maybeDataSchema
-.orElse(readSchema())
-.orElse(maybeMetastoreSchema)
-.getOrElse(sys.error("Failed to get the schema."))
-
-// If this Parquet relation is converted from a Hive Metastore table, 
must reconcile case
-// case insensitivity issue and possible schema mismatch (probably 
caused by schema
-// evolution).
-maybeMetastoreSchema
-  .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
-  .getOrElse(dataSchema0)
+  // If we already get the schema, don't need to re-compute it since the 
schema merging is
+  // time-consuming.
+  if (dataSchema == null) {
+dataSchema = {
+  val dataSchema0 =
+maybeDataSchema
+  .orElse(readSchema())
+  .orElse(maybeMetastoreSchema)
+  .getOrElse(sys.error("Failed to get the schema."))
+
+  // If this Parquet relation is converted from a Hive Metastore 
table, must reconcile case
+  // case insensitivity issue and possible schema mismatch (probably 
caused by schema
+  // evolution).
+  maybeMetastoreSchema
+.map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
+.getOrElse(dataSchema0)
+}
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7447] [SQL] Don't re-merge Parquet schema when the relation is deserialized

2015-05-17 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0ed376afa -> 898be6248


[SPARK-7447] [SQL] Don't re-merge Parquet schema when the relation is 
deserialized

JIRA: https://issues.apache.org/jira/browse/SPARK-7447

`MetadataCache` in `ParquetRelation2` is annotated as `transient`. When 
`ParquetRelation2` is deserialized, we ask `MetadataCache` to refresh and 
perform schema merging again. It is time-consuming especially for very many 
parquet files.

With the new `FSBasedParquetRelation`, although `MetadataCache` is not 
`transient` now, `MetadataCache.refresh()` still performs schema merging again 
when the relation is deserialized.

Author: Liang-Chi Hsieh 

Closes #6012 from viirya/without_remerge_schema and squashes the following 
commits:

2663957 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
without_remerge_schema
6ac7d93 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
without_remerge_schema
b0fc09b [Liang-Chi Hsieh] Don't generate and merge parquetSchema multiple times.

(cherry picked from commit 339905578790fa37fcad9684b859b443313a5aa2)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/898be624
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/898be624
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/898be624

Branch: refs/heads/branch-1.4
Commit: 898be6248940acddab6f40af0fbc7e7abb3adb76
Parents: 0ed376a
Author: Liang-Chi Hsieh 
Authored: Sun May 17 15:42:21 2015 +0800
Committer: Cheng Lian 
Committed: Sun May 17 15:42:40 2015 +0800

--
 .../apache/spark/sql/parquet/newParquet.scala   | 32 +++-
 1 file changed, 18 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/898be624/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 946062f..bcbdb1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -340,7 +340,7 @@ private[sql] class ParquetRelation2(
 
 // Schema of the actual Parquet files, without partition columns 
discovered from partition
 // directory paths.
-var dataSchema: StructType = _
+var dataSchema: StructType = null
 
 // Schema of the whole table, including partition columns.
 var schema: StructType = _
@@ -379,19 +379,23 @@ private[sql] class ParquetRelation2(
 f -> new Footer(f.getPath, parquetMetadata)
   }.seq.toMap
 
-  dataSchema = {
-val dataSchema0 =
-  maybeDataSchema
-.orElse(readSchema())
-.orElse(maybeMetastoreSchema)
-.getOrElse(sys.error("Failed to get the schema."))
-
-// If this Parquet relation is converted from a Hive Metastore table, 
must reconcile case
-// case insensitivity issue and possible schema mismatch (probably 
caused by schema
-// evolution).
-maybeMetastoreSchema
-  .map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
-  .getOrElse(dataSchema0)
+  // If we already get the schema, don't need to re-compute it since the 
schema merging is
+  // time-consuming.
+  if (dataSchema == null) {
+dataSchema = {
+  val dataSchema0 =
+maybeDataSchema
+  .orElse(readSchema())
+  .orElse(maybeMetastoreSchema)
+  .getOrElse(sys.error("Failed to get the schema."))
+
+  // If this Parquet relation is converted from a Hive Metastore 
table, must reconcile case
+  // case insensitivity issue and possible schema mismatch (probably 
caused by schema
+  // evolution).
+  maybeMetastoreSchema
+.map(ParquetRelation2.mergeMetastoreParquetSchema(_, dataSchema0))
+.getOrElse(dataSchema0)
+}
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7669] Builds against Hadoop 2.6+ get inconsistent curator depend…

2015-05-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 898be6248 -> 0feb3ded2


[SPARK-7669] Builds against Hadoop 2.6+ get inconsistent curator depend…

This adds a new profile, `hadoop-2.6`, copying over the hadoop-2.4 properties, 
updating ZK to 3.4.6 and making the curator version a configurable option. That 
keeps the curator-recipes JAR in sync with that used in hadoop.

There's one more option to consider: making the full curator-client version 
explicit with its own dependency version. This will pin down the version from 
hadoop and hive imports

Author: Steve Loughran 

Closes #6191 from steveloughran/stevel/SPARK-7669-hadoop-2.6 and squashes the 
following commits:

e3e281a [Steve Loughran] SPARK-7669 declare the version of curator-client and 
curator-framework JARs
2901ea9 [Steve Loughran] SPARK-7669 Builds against Hadoop 2.6+ get inconsistent 
curator dependencies

(cherry picked from commit 50217667cc1239ed3b15f4d10907b727ed85d7fa)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0feb3ded
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0feb3ded
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0feb3ded

Branch: refs/heads/branch-1.4
Commit: 0feb3ded2e1681e0c282b73a1ec5af454bc78a13
Parents: 898be62
Author: Steve Loughran 
Authored: Sun May 17 17:03:11 2015 +0100
Committer: Sean Owen 
Committed: Sun May 17 17:03:20 2015 +0100

--
 pom.xml | 26 --
 1 file changed, 24 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0feb3ded/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 1b45cdb..6768a03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,7 @@
 hbase
 1.4.0
 3.4.5
+2.4.0
 org.spark-project.hive
 
 0.13.1a
@@ -707,7 +708,7 @@
   
 org.apache.curator
 curator-recipes
-2.4.0
+${curator.version}
 ${hadoop.deps.scope}
 
   
@@ -717,6 +718,16 @@
 
   
   
+org.apache.curator
+curator-client
+${curator.version}
+  
+  
+org.apache.curator
+curator-framework
+${curator.version}
+  
+  
 org.apache.hadoop
 hadoop-client
 ${hadoop.version}
@@ -1680,6 +1691,17 @@
 
 
 
+  hadoop-2.6
+  
+2.6.0
+0.9.3
+3.1.1
+3.4.6
+2.6.0
+  
+
+
+
   yarn
   
 yarn
@@ -1709,7 +1731,7 @@
 
   org.apache.curator
   curator-recipes
-  2.4.0
+  ${curator.version}
   
 
   org.apache.zookeeper


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7669] Builds against Hadoop 2.6+ get inconsistent curator depend…

2015-05-17 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 339905578 -> 50217667c


[SPARK-7669] Builds against Hadoop 2.6+ get inconsistent curator depend…

This adds a new profile, `hadoop-2.6`, copying over the hadoop-2.4 properties, 
updating ZK to 3.4.6 and making the curator version a configurable option. That 
keeps the curator-recipes JAR in sync with that used in hadoop.

There's one more option to consider: making the full curator-client version 
explicit with its own dependency version. This will pin down the version from 
hadoop and hive imports

Author: Steve Loughran 

Closes #6191 from steveloughran/stevel/SPARK-7669-hadoop-2.6 and squashes the 
following commits:

e3e281a [Steve Loughran] SPARK-7669 declare the version of curator-client and 
curator-framework JARs
2901ea9 [Steve Loughran] SPARK-7669 Builds against Hadoop 2.6+ get inconsistent 
curator dependencies


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50217667
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50217667
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50217667

Branch: refs/heads/master
Commit: 50217667cc1239ed3b15f4d10907b727ed85d7fa
Parents: 3399055
Author: Steve Loughran 
Authored: Sun May 17 17:03:11 2015 +0100
Committer: Sean Owen 
Committed: Sun May 17 17:03:11 2015 +0100

--
 pom.xml | 26 --
 1 file changed, 24 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/50217667/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 1b45cdb..6768a03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,6 +130,7 @@
 hbase
 1.4.0
 3.4.5
+2.4.0
 org.spark-project.hive
 
 0.13.1a
@@ -707,7 +708,7 @@
   
 org.apache.curator
 curator-recipes
-2.4.0
+${curator.version}
 ${hadoop.deps.scope}
 
   
@@ -717,6 +718,16 @@
 
   
   
+org.apache.curator
+curator-client
+${curator.version}
+  
+  
+org.apache.curator
+curator-framework
+${curator.version}
+  
+  
 org.apache.hadoop
 hadoop-client
 ${hadoop.version}
@@ -1680,6 +1691,17 @@
 
 
 
+  hadoop-2.6
+  
+2.6.0
+0.9.3
+3.1.1
+3.4.6
+2.6.0
+  
+
+
+
   yarn
   
 yarn
@@ -1709,7 +1731,7 @@
 
   org.apache.curator
   curator-recipes
-  2.4.0
+  ${curator.version}
   
 
   org.apache.zookeeper


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

2015-05-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 50217667c -> f2cc6b5bc


[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent 
and to guard against write-after-`close()` bugs. This is a workaround for 
https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent 
`close()` method can lead to stream corruption. We can remove this workaround 
if we upgrade to a snappy-java version that contains my fix for this bug, but 
in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen 

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following 
commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2cc6b5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2cc6b5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2cc6b5b

Branch: refs/heads/master
Commit: f2cc6b5bccc3a70fd7d69183b1a068800831fe19
Parents: 5021766
Author: Josh Rosen 
Authored: Sun May 17 09:30:49 2015 -0700
Committer: Josh Rosen 
Committed: Sun May 17 09:30:49 2015 -0700

--
 .../org/apache/spark/io/CompressionCodec.scala  | 49 +++-
 .../unsafe/UnsafeShuffleWriterSuite.java|  8 
 2 files changed, 47 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f2cc6b5b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
--
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb..0d8ac1f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
 val blockSize = 
conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
-new SnappyOutputStream(s, blockSize)
+new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new 
SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close 
and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we 
upgrade to a version
+ * of snappy-java that contains the fix for 
https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends 
OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.flush()
+  }
+
+  override def close(): Unit = {
+if (!closed) {
+  closed = true
+  os.close()
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f2cc6b5b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
--
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 78e5264..730d265 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -35,7 +35,6 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.xerial.snappy.buffer.CachedBufferAllocator;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
@@ -97,13 

spark git commit: [SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

2015-05-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0feb3ded2 -> 6df71eb8c


[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent 
and to guard against write-after-`close()` bugs. This is a workaround for 
https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent 
`close()` method can lead to stream corruption. We can remove this workaround 
if we upgrade to a snappy-java version that contains my fix for this bug, but 
in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen 

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following 
commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660

(cherry picked from commit f2cc6b5bccc3a70fd7d69183b1a068800831fe19)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6df71eb8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6df71eb8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6df71eb8

Branch: refs/heads/branch-1.4
Commit: 6df71eb8c1e05d2bcec16aab986d4cc15ac4aa8b
Parents: 0feb3de
Author: Josh Rosen 
Authored: Sun May 17 09:30:49 2015 -0700
Committer: Josh Rosen 
Committed: Sun May 17 09:33:49 2015 -0700

--
 .../org/apache/spark/io/CompressionCodec.scala  | 49 +++-
 .../unsafe/UnsafeShuffleWriterSuite.java|  8 
 2 files changed, 47 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6df71eb8/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
--
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb..0d8ac1f 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
 val blockSize = 
conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
-new SnappyOutputStream(s, blockSize)
+new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new 
SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close 
and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we 
upgrade to a version
+ * of snappy-java that contains the fix for 
https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends 
OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.flush()
+  }
+
+  override def close(): Unit = {
+if (!closed) {
+  closed = true
+  os.close()
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6df71eb8/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
--
diff --git 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 78e5264..730d265 100644
--- 
a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ 
b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -35,7 +35,6 @@ import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.xerial.snappy.buffer.CachedBufferAllocator;
 import static org.hamcrest.MatcherAssert.assertThat;
 i

spark git commit: [SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

2015-05-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 91442fdfc -> 0a6310373


[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent 
and to guard against write-after-`close()` bugs. This is a workaround for 
https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent 
`close()` method can lead to stream corruption. We can remove this workaround 
if we upgrade to a snappy-java version that contains my fix for this bug, but 
in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen 

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following 
commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660

(cherry picked from commit f2cc6b5bccc3a70fd7d69183b1a068800831fe19)
Signed-off-by: Josh Rosen 

Conflicts:
core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a631037
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a631037
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a631037

Branch: refs/heads/branch-1.3
Commit: 0a63103739db029c1e871ed0f6c36aa43e326121
Parents: 91442fd
Author: Josh Rosen 
Authored: Sun May 17 09:30:49 2015 -0700
Committer: Josh Rosen 
Committed: Sun May 17 09:38:31 2015 -0700

--
 .../org/apache/spark/io/CompressionCodec.scala  | 49 +++-
 1 file changed, 47 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0a631037/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
--
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0709b6d..2fcb53d 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -154,8 +154,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
 val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 
32768)
-new SnappyOutputStream(s, blockSize)
+new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new 
SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close 
and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we 
upgrade to a version
+ * of snappy-java that contains the fix for 
https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends 
OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.flush()
+  }
+
+  override def close(): Unit = {
+if (!closed) {
+  closed = true
+  os.close()
+}
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

2015-05-17 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 5505a0d07 -> 6c41e1cb9


[SPARK-7660] Wrap SnappyOutputStream to work around snappy-java bug

This patch wraps `SnappyOutputStream` to ensure that `close()` is idempotent 
and to guard against write-after-`close()` bugs. This is a workaround for 
https://github.com/xerial/snappy-java/issues/107, a bug where a non-idempotent 
`close()` method can lead to stream corruption. We can remove this workaround 
if we upgrade to a snappy-java version that contains my fix for this bug, but 
in the meantime this patch offers a backportable Spark fix.

Author: Josh Rosen 

Closes #6176 from JoshRosen/SPARK-7660-wrap-snappy and squashes the following 
commits:

8b77aae [Josh Rosen] Wrap SnappyOutputStream to fix SPARK-7660

(cherry picked from commit f2cc6b5bccc3a70fd7d69183b1a068800831fe19)
Signed-off-by: Josh Rosen 

Conflicts:
core/src/main/scala/org/apache/spark/io/CompressionCodec.scala

core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c41e1cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c41e1cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c41e1cb

Branch: refs/heads/branch-1.2
Commit: 6c41e1cb9c4913d9d539cf7a5b4fe6cb2c075032
Parents: 5505a0d
Author: Josh Rosen 
Authored: Sun May 17 09:30:49 2015 -0700
Committer: Josh Rosen 
Committed: Sun May 17 09:41:08 2015 -0700

--
 .../org/apache/spark/io/CompressionCodec.scala  | 49 +++-
 1 file changed, 47 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6c41e1cb/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
--
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 1ac7f4e..2343e69 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.io
 
-import java.io.{InputStream, OutputStream}
+import java.io.{IOException, InputStream, OutputStream}
 
 import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
@@ -122,8 +122,53 @@ class SnappyCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
 val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 
32768)
-new SnappyOutputStream(s, blockSize)
+new SnappyOutputStreamWrapper(new SnappyOutputStream(s, blockSize))
   }
 
   override def compressedInputStream(s: InputStream): InputStream = new 
SnappyInputStream(s)
 }
+
+/**
+ * Wrapper over [[SnappyOutputStream]] which guards against write-after-close 
and double-close
+ * issues. See SPARK-7660 for more details. This wrapping can be removed if we 
upgrade to a version
+ * of snappy-java that contains the fix for 
https://github.com/xerial/snappy-java/issues/107.
+ */
+private final class SnappyOutputStreamWrapper(os: SnappyOutputStream) extends 
OutputStream {
+
+  private[this] var closed: Boolean = false
+
+  override def write(b: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte]): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b)
+  }
+
+  override def write(b: Array[Byte], off: Int, len: Int): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.write(b, off, len)
+  }
+
+  override def flush(): Unit = {
+if (closed) {
+  throw new IOException("Stream is closed")
+}
+os.flush()
+  }
+
+  override def close(): Unit = {
+if (!closed) {
+  closed = true
+  os.close()
+}
+  }
+}


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7686] [SQL] DescribeCommand is assigned wrong output attributes in SparkStrategies

2015-05-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master f2cc6b5bc -> 564562874


[SPARK-7686] [SQL] DescribeCommand is assigned wrong output attributes in 
SparkStrategies

In `SparkStrategies`, `RunnableDescribeCommand` is called with the output 
attributes of the table being described rather than the attributes for the 
`describe` command's output.  I discovered this issue because it caused type 
conversion errors in some UnsafeRow conversion code that I'm writing.

Author: Josh Rosen 

Closes #6217 from JoshRosen/SPARK-7686 and squashes the following commits:

953a344 [Josh Rosen] Fix SPARK-7686 with a simple change in SparkStrategies.
a4eec9f [Josh Rosen] Add failing regression test for SPARK-7686


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56456287
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56456287
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56456287

Branch: refs/heads/master
Commit: 564562874f589c4c8bcabcd9d6eb9a6b0eada938
Parents: f2cc6b5
Author: Josh Rosen 
Authored: Sun May 17 11:59:28 2015 -0700
Committer: Reynold Xin 
Committed: Sun May 17 11:59:28 2015 -0700

--
 .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++--
 .../test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala | 6 ++
 2 files changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/56456287/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index af0029c..3f6a034 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -354,10 +354,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case c: CreateTableUsingAsSelect if !c.temporary =>
 sys.error("Tables created with SQLContext must be TEMPORARY. Use a 
HiveContext instead.")
 
-  case LogicalDescribeCommand(table, isExtended) =>
+  case describe @ LogicalDescribeCommand(table, isExtended) =>
 val resultPlan = self.sqlContext.executePlan(table).executedPlan
 ExecutedCommand(
-  RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) 
:: Nil
+  RunnableDescribeCommand(resultPlan, describe.output, isExtended)) :: 
Nil
 
   case _ => Nil
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/56456287/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
index 6664e8d..f5106f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -99,4 +99,10 @@ class DDLTestSuite extends DataSourceTest {
 Row("arrayType", "array", ""),
 Row("structType", "struct", "")
   ))
+
+  test("SPARK-7686 DescribeCommand should have correct physical plan output 
attributes") {
+val attributes = sql("describe 
ddlPeople").queryExecution.executedPlan.output
+assert(attributes.map(_.name) === Seq("col_name", "data_type", "comment"))
+assert(attributes.map(_.dataType).toSet === Set(StringType))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7686] [SQL] DescribeCommand is assigned wrong output attributes in SparkStrategies

2015-05-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 6df71eb8c -> 53d6ab51b


[SPARK-7686] [SQL] DescribeCommand is assigned wrong output attributes in 
SparkStrategies

In `SparkStrategies`, `RunnableDescribeCommand` is called with the output 
attributes of the table being described rather than the attributes for the 
`describe` command's output.  I discovered this issue because it caused type 
conversion errors in some UnsafeRow conversion code that I'm writing.

Author: Josh Rosen 

Closes #6217 from JoshRosen/SPARK-7686 and squashes the following commits:

953a344 [Josh Rosen] Fix SPARK-7686 with a simple change in SparkStrategies.
a4eec9f [Josh Rosen] Add failing regression test for SPARK-7686

(cherry picked from commit 564562874f589c4c8bcabcd9d6eb9a6b0eada938)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53d6ab51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53d6ab51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53d6ab51

Branch: refs/heads/branch-1.4
Commit: 53d6ab51b21de9a1bfc355f8493fcd6e9159ab07
Parents: 6df71eb
Author: Josh Rosen 
Authored: Sun May 17 11:59:28 2015 -0700
Committer: Reynold Xin 
Committed: Sun May 17 11:59:35 2015 -0700

--
 .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++--
 .../test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala | 6 ++
 2 files changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/53d6ab51/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index af0029c..3f6a034 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -354,10 +354,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
   case c: CreateTableUsingAsSelect if !c.temporary =>
 sys.error("Tables created with SQLContext must be TEMPORARY. Use a 
HiveContext instead.")
 
-  case LogicalDescribeCommand(table, isExtended) =>
+  case describe @ LogicalDescribeCommand(table, isExtended) =>
 val resultPlan = self.sqlContext.executePlan(table).executedPlan
 ExecutedCommand(
-  RunnableDescribeCommand(resultPlan, resultPlan.output, isExtended)) 
:: Nil
+  RunnableDescribeCommand(resultPlan, describe.output, isExtended)) :: 
Nil
 
   case _ => Nil
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/53d6ab51/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
index 6664e8d..f5106f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLTestSuite.scala
@@ -99,4 +99,10 @@ class DDLTestSuite extends DataSourceTest {
 Row("arrayType", "array", ""),
 Row("structType", "struct", "")
   ))
+
+  test("SPARK-7686 DescribeCommand should have correct physical plan output 
attributes") {
+val attributes = sql("describe 
ddlPeople").queryExecution.executedPlan.output
+assert(attributes.map(_.name) === Seq("col_name", "data_type", "comment"))
+assert(attributes.map(_.dataType).toSet === Set(StringType))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7491] [SQL] Allow configuration of classloader isolation for hive

2015-05-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 564562874 -> 2ca60ace8


[SPARK-7491] [SQL] Allow configuration of classloader isolation for hive

Author: Michael Armbrust 

Closes #6167 from marmbrus/configureIsolation and squashes the following 
commits:

6147cbe [Michael Armbrust] filter other conf
22cc3bc7 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into 
configureIsolation
07476ee [Michael Armbrust] filter empty prefixes
dfdf19c [Michael Armbrust] [SPARK-6906][SQL] Allow configuration of classloader 
isolation for hive


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ca60ace
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ca60ace
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ca60ace

Branch: refs/heads/master
Commit: 2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c
Parents: 5645628
Author: Michael Armbrust 
Authored: Sun May 17 12:43:15 2015 -0700
Committer: Michael Armbrust 
Committed: Sun May 17 12:43:15 2015 -0700

--
 .../org/apache/spark/sql/hive/HiveContext.scala | 33 ++--
 .../sql/hive/client/IsolatedClientLoader.scala  | 14 +
 .../apache/spark/sql/hive/test/TestHive.scala   |  9 +-
 3 files changed, 46 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ca60ace/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9d98c36..2733ebd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) 
{
   protected[hive] def hiveMetastoreJars: String =
 getConf(HIVE_METASTORE_JARS, "builtin")
 
+  /**
+   * A comma separated list of class prefixes that should be loaded using the 
classloader that
+   * is shared between Spark SQL and a specific version of Hive. An example of 
classes that should
+   * be shared is JDBC drivers that are needed to talk to the metastore. Other 
classes that need
+   * to be shared are those that interact with classes that are already 
shared.  For example,
+   * custom appenders that are used by log4j.
+   */
+  protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
+getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
+  .split(",").filterNot(_ == "")
+
+  private def jdbcPrefixes = Seq(
+"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", 
"oracle.jdbc").mkString(",")
+
+  /**
+   * A comma separated list of class prefixes that should explicitly be 
reloaded for each version
+   * of Hive that Spark SQL is communicating with.  For example, Hive UDFs 
that are declared in a
+   * prefix that typically would be shared (i.e. org.apache.spark.*)
+   */
+  protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
+getConf("spark.sql.hive.metastore.barrierPrefixes", "")
+  .split(",").filterNot(_ == "")
+
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
 
@@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends 
SQLContext(sc) {
 version = metaVersion,
 execJars = jars.toSeq,
 config = allConfig,
-isolationOn = true)
+isolationOn = true,
+barrierPrefixes = hiveMetastoreBarrierPrefixes,
+sharedPrefixes = hiveMetastoreSharedPrefixes)
 } else if (hiveMetastoreJars == "maven") {
   // TODO: Support for loading the jars from an already downloaded 
location.
   logInfo(
 s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion 
using maven.")
-  IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
+  IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
 } else {
   // Convert to files and expand any directories.
   val jars =
@@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 version = metaVersion,
 execJars = jars.toSeq,
 config = allConfig,
-isolationOn = true)
+isolationOn = true,
+barrierPrefixes = hiveMetastoreBarrierPrefixes,
+sharedPrefixes = hiveMetastoreSharedPrefixes)
 }
 isolatedLoader.client
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ca60ace/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
 
b/sql/hive/src/main/scal

spark git commit: [SPARK-7491] [SQL] Allow configuration of classloader isolation for hive

2015-05-17 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 53d6ab51b -> a8556086d


[SPARK-7491] [SQL] Allow configuration of classloader isolation for hive

Author: Michael Armbrust 

Closes #6167 from marmbrus/configureIsolation and squashes the following 
commits:

6147cbe [Michael Armbrust] filter other conf
22cc3bc7 [Michael Armbrust] Merge remote-tracking branch 'origin/master' into 
configureIsolation
07476ee [Michael Armbrust] filter empty prefixes
dfdf19c [Michael Armbrust] [SPARK-6906][SQL] Allow configuration of classloader 
isolation for hive

(cherry picked from commit 2ca60ace8f42cf0bd4569d86c86c37a8a2b6a37c)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8556086
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8556086
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8556086

Branch: refs/heads/branch-1.4
Commit: a8556086d33cb993fab0ae2751e31455e6c664ab
Parents: 53d6ab5
Author: Michael Armbrust 
Authored: Sun May 17 12:43:15 2015 -0700
Committer: Michael Armbrust 
Committed: Sun May 17 12:43:26 2015 -0700

--
 .../org/apache/spark/sql/hive/HiveContext.scala | 33 ++--
 .../sql/hive/client/IsolatedClientLoader.scala  | 14 +
 .../apache/spark/sql/hive/test/TestHive.scala   |  9 +-
 3 files changed, 46 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a8556086/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 9d98c36..2733ebd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -122,6 +122,29 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) 
{
   protected[hive] def hiveMetastoreJars: String =
 getConf(HIVE_METASTORE_JARS, "builtin")
 
+  /**
+   * A comma separated list of class prefixes that should be loaded using the 
classloader that
+   * is shared between Spark SQL and a specific version of Hive. An example of 
classes that should
+   * be shared is JDBC drivers that are needed to talk to the metastore. Other 
classes that need
+   * to be shared are those that interact with classes that are already 
shared.  For example,
+   * custom appenders that are used by log4j.
+   */
+  protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
+getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
+  .split(",").filterNot(_ == "")
+
+  private def jdbcPrefixes = Seq(
+"com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", 
"oracle.jdbc").mkString(",")
+
+  /**
+   * A comma separated list of class prefixes that should explicitly be 
reloaded for each version
+   * of Hive that Spark SQL is communicating with.  For example, Hive UDFs 
that are declared in a
+   * prefix that typically would be shared (i.e. org.apache.spark.*)
+   */
+  protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
+getConf("spark.sql.hive.metastore.barrierPrefixes", "")
+  .split(",").filterNot(_ == "")
+
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
 
@@ -179,12 +202,14 @@ class HiveContext(sc: SparkContext) extends 
SQLContext(sc) {
 version = metaVersion,
 execJars = jars.toSeq,
 config = allConfig,
-isolationOn = true)
+isolationOn = true,
+barrierPrefixes = hiveMetastoreBarrierPrefixes,
+sharedPrefixes = hiveMetastoreSharedPrefixes)
 } else if (hiveMetastoreJars == "maven") {
   // TODO: Support for loading the jars from an already downloaded 
location.
   logInfo(
 s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion 
using maven.")
-  IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig )
+  IsolatedClientLoader.forVersion(hiveMetastoreVersion, allConfig)
 } else {
   // Convert to files and expand any directories.
   val jars =
@@ -210,7 +235,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 version = metaVersion,
 execJars = jars.toSeq,
 config = allConfig,
-isolationOn = true)
+isolationOn = true,
+barrierPrefixes = hiveMetastoreBarrierPrefixes,
+sharedPrefixes = hiveMetastoreSharedPrefixes)
 }
 isolatedLoader.client
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a8556086/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
--
diff --git 
a

spark git commit: [SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] Updates to the Kinesis API

2015-05-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 2ca60ace8 -> ca4257aec


[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] 
Updates to the Kinesis API

SPARK-6514 - Use correct region
SPARK-5960 - Allow AWS Credentials to be directly passed
SPARK-6656 - Specify kinesis application name explicitly
SPARK-7679 - Upgrade to latest KCL and AWS SDK.

Author: Tathagata Das 

Closes #6147 from tdas/kinesis-api-update and squashes the following commits:

f23ea77 [Tathagata Das] Updated versions and updated APIs
373b201 [Tathagata Das] Updated Kinesis API


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca4257ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca4257ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca4257ae

Branch: refs/heads/master
Commit: ca4257aec658aaa87f4f097dd7534033d5f13ddc
Parents: 2ca60ac
Author: Tathagata Das 
Authored: Sun May 17 16:49:07 2015 -0700
Committer: Tathagata Das 
Committed: Sun May 17 16:49:07 2015 -0700

--
 .../kinesis/KinesisCheckpointState.scala|   2 +-
 .../streaming/kinesis/KinesisReceiver.scala | 152 ++-
 .../kinesis/KinesisRecordProcessor.scala|  32 ++-
 .../spark/streaming/kinesis/KinesisUtils.scala  | 263 ---
 .../kinesis/KinesisReceiverSuite.scala  |  15 +-
 pom.xml |   4 +-
 6 files changed, 348 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
--
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 588e86a..1c9b0c2 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState(
   /**
* Advance the checkpoint clock by the checkpoint interval.
*/
-  def advanceCheckpoint() = {
+  def advanceCheckpoint(): Unit = {
 checkpointClock.advance(checkpointInterval.milliseconds)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca4257ae/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
--
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index a7fe447..01608fb 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -16,32 +16,31 @@
  */
 package org.apache.spark.streaming.kinesis
 
-import java.net.InetAddress
 import java.util.UUID
 
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, 
BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorFactory}
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
 KinesisClientLibConfiguration, Worker}
+
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.util.Utils
 
-import com.amazonaws.auth.AWSCredentialsProvider
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
-import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+private[kinesis]
+case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
+  extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
 
 /**
  * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
  * This implementation relies on the Kinesis Client Library (KCL) Worker as 
described here:
  * https://github.com/awslabs/amazon-kinesis-client
- * This is a custom receiver used with 
StreamingContext.receiverStream(Receiver) 
- *   as desc

spark git commit: [SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] Updates to the Kinesis API

2015-05-17 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 a8556086d -> e0632ffaf


[SPARK-6514] [SPARK-5960] [SPARK-6656] [SPARK-7679] [STREAMING] [KINESIS] 
Updates to the Kinesis API

SPARK-6514 - Use correct region
SPARK-5960 - Allow AWS Credentials to be directly passed
SPARK-6656 - Specify kinesis application name explicitly
SPARK-7679 - Upgrade to latest KCL and AWS SDK.

Author: Tathagata Das 

Closes #6147 from tdas/kinesis-api-update and squashes the following commits:

f23ea77 [Tathagata Das] Updated versions and updated APIs
373b201 [Tathagata Das] Updated Kinesis API

(cherry picked from commit ca4257aec658aaa87f4f097dd7534033d5f13ddc)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0632ffa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0632ffa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0632ffa

Branch: refs/heads/branch-1.4
Commit: e0632ffafd7c4bc6c87514f591fd6eb17f68cfed
Parents: a855608
Author: Tathagata Das 
Authored: Sun May 17 16:49:07 2015 -0700
Committer: Tathagata Das 
Committed: Sun May 17 16:49:31 2015 -0700

--
 .../kinesis/KinesisCheckpointState.scala|   2 +-
 .../streaming/kinesis/KinesisReceiver.scala | 152 ++-
 .../kinesis/KinesisRecordProcessor.scala|  32 ++-
 .../spark/streaming/kinesis/KinesisUtils.scala  | 263 ---
 .../kinesis/KinesisReceiverSuite.scala  |  15 +-
 pom.xml |   4 +-
 6 files changed, 348 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e0632ffa/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
--
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
index 588e86a..1c9b0c2 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisCheckpointState.scala
@@ -48,7 +48,7 @@ private[kinesis] class KinesisCheckpointState(
   /**
* Advance the checkpoint clock by the checkpoint interval.
*/
-  def advanceCheckpoint() = {
+  def advanceCheckpoint(): Unit = {
 checkpointClock.advance(checkpointInterval.milliseconds)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e0632ffa/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
--
diff --git 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index a7fe447..01608fb 100644
--- 
a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ 
b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -16,32 +16,31 @@
  */
 package org.apache.spark.streaming.kinesis
 
-import java.net.InetAddress
 import java.util.UUID
 
+import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, 
BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
+import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, 
IRecordProcessorFactory}
+import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream,
 KinesisClientLibConfiguration, Worker}
+
 import org.apache.spark.Logging
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming.Duration
 import org.apache.spark.streaming.receiver.Receiver
 import org.apache.spark.util.Utils
 
-import com.amazonaws.auth.AWSCredentialsProvider
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
-import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
-import 
com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
-import 
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
-import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker
+
+private[kinesis]
+case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
+  extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
 
 /**
  * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
  * This implementation relies on the Kinesis Client Library (KCL) Worker as 
described here:
  * https://github.com/awslabs/amazon-kine

spark git commit: [SQL] [MINOR] use catalyst type converter in ScalaUdf

2015-05-17 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master ca4257aec -> 2f22424e9


[SQL] [MINOR] use catalyst type converter in ScalaUdf

It's a follow-up of https://github.com/apache/spark/pull/5154, we can speed up 
scala udf evaluation by create type converter in advance.

Author: Wenchen Fan 

Closes #6182 from cloud-fan/tmp and squashes the following commits:

241cfe9 [Wenchen Fan] use converter in ScalaUdf


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f22424e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f22424e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f22424e

Branch: refs/heads/master
Commit: 2f22424e9f6624097b292cb70e00787b69d80718
Parents: ca4257a
Author: Wenchen Fan 
Authored: Sun May 17 16:51:57 2015 -0700
Committer: Yin Huai 
Committed: Sun May 17 16:51:57 2015 -0700

--
 .../org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2f22424e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index 9a77ca6..d22eb10 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -956,7 +956,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, 
children: Seq[Expressi
   }
 
   // scalastyle:on
-
-  override def eval(input: Row): Any = 
CatalystTypeConverters.convertToCatalyst(f(input), dataType)
+  val converter = CatalystTypeConverters.createToCatalystConverter(dataType)
+  override def eval(input: Row): Any = converter(f(input))
 
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SQL] [MINOR] use catalyst type converter in ScalaUdf

2015-05-17 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 e0632ffaf -> be66d1924


[SQL] [MINOR] use catalyst type converter in ScalaUdf

It's a follow-up of https://github.com/apache/spark/pull/5154, we can speed up 
scala udf evaluation by create type converter in advance.

Author: Wenchen Fan 

Closes #6182 from cloud-fan/tmp and squashes the following commits:

241cfe9 [Wenchen Fan] use converter in ScalaUdf

(cherry picked from commit 2f22424e9f6624097b292cb70e00787b69d80718)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be66d192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be66d192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be66d192

Branch: refs/heads/branch-1.4
Commit: be66d1924edc5c99987c80d445f34a690c3789a9
Parents: e0632ff
Author: Wenchen Fan 
Authored: Sun May 17 16:51:57 2015 -0700
Committer: Yin Huai 
Committed: Sun May 17 16:52:21 2015 -0700

--
 .../org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/be66d192/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
index 9a77ca6..d22eb10 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
@@ -956,7 +956,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, 
children: Seq[Expressi
   }
 
   // scalastyle:on
-
-  override def eval(input: Row): Any = 
CatalystTypeConverters.convertToCatalyst(f(input), dataType)
+  val converter = CatalystTypeConverters.createToCatalystConverter(dataType)
+  override def eval(input: Row): Any = converter(f(input))
 
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.Implicits.global"

2015-05-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 2f22424e9 -> ff71d34e0


[SPARK-7693][Core] Remove "import 
scala.concurrent.ExecutionContext.Implicits.global"

Learnt a lesson from SPARK-7655: Spark should avoid to use 
`scala.concurrent.ExecutionContext.Implicits.global` because the user may 
submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` 
and exhaust all threads in it. This could crash Spark. So Spark should always 
use its own thread pools for safety.

This PR removes all usages of 
`scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread 
pools to replace them.

Author: zsxwing 

Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits:

a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128
cf4b3fc [zsxwing] Remove "import 
scala.concurrent.ExecutionContext.Implicits.global"


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff71d34e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff71d34e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff71d34e

Branch: refs/heads/master
Commit: ff71d34e00b64d70f671f9bf3e63aec39cd525e5
Parents: 2f22424
Author: zsxwing 
Authored: Sun May 17 20:37:19 2015 -0700
Committer: Reynold Xin 
Committed: Sun May 17 20:37:19 2015 -0700

--
 .../executor/CoarseGrainedExecutorBackend.scala |  9 +++---
 .../org/apache/spark/rdd/AsyncRDDActions.scala  | 13 +++--
 .../org/apache/spark/storage/BlockManager.scala | 17 +---
 .../spark/storage/BlockManagerMaster.scala  | 29 
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../streaming/receiver/ReceiverSupervisor.scala | 14 +++---
 6 files changed, 58 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ff71d34e/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ed159de..f3a26f5 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
 override val rpcEnv: RpcEnv,
@@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
   private[this] val ser: SerializerInstance = 
env.closureSerializer.newInstance()
 
   override def onStart() {
-import scala.concurrent.ExecutionContext.Implicits.global
 logInfo("Connecting to driver: " + driverUrl)
 rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
+  // This is a very fast action so we can use "ThreadUtils.sameThread"
   driver = Some(ref)
   ref.ask[RegisteredExecutor.type](
 RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
-} onComplete {
+}(ThreadUtils.sameThread).onComplete {
+  // This is a very fast action so we can use "ThreadUtils.sameThread"
   case Success(msg) => Utils.tryLogNonFatalError {
 Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
   }
   case Failure(e) => logError(s"Cannot register with driver: $driverUrl", 
e)
-}
+}(ThreadUtils.sameThread)
   }
 
   def extractLogUrls: Map[String, String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ff71d34e/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ec18534..bbf1b83 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -19,8 +19,10 @@ package org.apache.spark.rdd
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.util.ThreadUtils
+
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.ExecutionContext
 import scala.reflect.ClassTag
 
 import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T])

spark git commit: [SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.Implicits.global"

2015-05-17 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 be66d1924 -> 2a42d2d8f


[SPARK-7693][Core] Remove "import 
scala.concurrent.ExecutionContext.Implicits.global"

Learnt a lesson from SPARK-7655: Spark should avoid to use 
`scala.concurrent.ExecutionContext.Implicits.global` because the user may 
submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` 
and exhaust all threads in it. This could crash Spark. So Spark should always 
use its own thread pools for safety.

This PR removes all usages of 
`scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread 
pools to replace them.

Author: zsxwing 

Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits:

a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128
cf4b3fc [zsxwing] Remove "import 
scala.concurrent.ExecutionContext.Implicits.global"

(cherry picked from commit ff71d34e00b64d70f671f9bf3e63aec39cd525e5)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a42d2d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a42d2d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a42d2d8

Branch: refs/heads/branch-1.4
Commit: 2a42d2d8f26e3e66ab8c926e952a20a3900ca7f3
Parents: be66d19
Author: zsxwing 
Authored: Sun May 17 20:37:19 2015 -0700
Committer: Reynold Xin 
Committed: Sun May 17 20:37:27 2015 -0700

--
 .../executor/CoarseGrainedExecutorBackend.scala |  9 +++---
 .../org/apache/spark/rdd/AsyncRDDActions.scala  | 13 +++--
 .../org/apache/spark/storage/BlockManager.scala | 17 +---
 .../spark/storage/BlockManagerMaster.scala  | 29 
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../streaming/receiver/ReceiverSupervisor.scala | 14 +++---
 6 files changed, 58 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ed159de..f3a26f5 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
 
 private[spark] class CoarseGrainedExecutorBackend(
 override val rpcEnv: RpcEnv,
@@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
   private[this] val ser: SerializerInstance = 
env.closureSerializer.newInstance()
 
   override def onStart() {
-import scala.concurrent.ExecutionContext.Implicits.global
 logInfo("Connecting to driver: " + driverUrl)
 rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
+  // This is a very fast action so we can use "ThreadUtils.sameThread"
   driver = Some(ref)
   ref.ask[RegisteredExecutor.type](
 RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
-} onComplete {
+}(ThreadUtils.sameThread).onComplete {
+  // This is a very fast action so we can use "ThreadUtils.sameThread"
   case Success(msg) => Utils.tryLogNonFatalError {
 Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
   }
   case Failure(e) => logError(s"Cannot register with driver: $driverUrl", 
e)
-}
+}(ThreadUtils.sameThread)
   }
 
   def extractLogUrls: Map[String, String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2a42d2d8/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ec18534..bbf1b83 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -19,8 +19,10 @@ package org.apache.spark.rdd
 
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.util.ThreadUtils
+
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.ExecutionContext
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Comp

spark git commit: [SPARK-7694] [MLLIB] Use getOrElse for getting the threshold of LR model

2015-05-17 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master ff71d34e0 -> 775e6f990


[SPARK-7694] [MLLIB] Use getOrElse for getting the threshold of LR model

The `toString` method of `LogisticRegressionModel` calls `get` method on an 
Option (threshold) without a safeguard. In spark-shell, the following code `val 
model = algorithm.run(data).clearThreshold()` in lbfgs code will fail as 
`toString `method will be called right after `clearThreshold()` to show the 
results in the REPL.

Author: Shuo Xiang 

Closes #6224 from coderxiang/getorelse and squashes the following commits:

d5f53c9 [Shuo Xiang] use getOrElse for getting the threshold of LR model
5f109b4 [Shuo Xiang] Merge remote-tracking branch 'upstream/master'
c5c5bfe [Shuo Xiang] Merge remote-tracking branch 'upstream/master'
98804c9 [Shuo Xiang] fix bug in topBykey and update test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/775e6f99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/775e6f99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/775e6f99

Branch: refs/heads/master
Commit: 775e6f9909d4495cbc11c377508b43482d782742
Parents: ff71d34
Author: Shuo Xiang 
Authored: Sun May 17 21:16:52 2015 -0700
Committer: Xiangrui Meng 
Committed: Sun May 17 21:16:52 2015 -0700

--
 .../org/apache/spark/mllib/classification/LogisticRegression.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/775e6f99/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index bd2e907..2df4d21 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -163,7 +163,7 @@ class LogisticRegressionModel (
   override protected def formatVersion: String = "1.0"
 
   override def toString: String = {
-s"${super.toString}, numClasses = ${numClasses}, threshold = 
${threshold.get}"
+s"${super.toString}, numClasses = ${numClasses}, threshold = 
${threshold.getOrElse("None")}"
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-7694] [MLLIB] Use getOrElse for getting the threshold of LR model

2015-05-17 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 2a42d2d8f -> 0b6bc8a23


[SPARK-7694] [MLLIB] Use getOrElse for getting the threshold of LR model

The `toString` method of `LogisticRegressionModel` calls `get` method on an 
Option (threshold) without a safeguard. In spark-shell, the following code `val 
model = algorithm.run(data).clearThreshold()` in lbfgs code will fail as 
`toString `method will be called right after `clearThreshold()` to show the 
results in the REPL.

Author: Shuo Xiang 

Closes #6224 from coderxiang/getorelse and squashes the following commits:

d5f53c9 [Shuo Xiang] use getOrElse for getting the threshold of LR model
5f109b4 [Shuo Xiang] Merge remote-tracking branch 'upstream/master'
c5c5bfe [Shuo Xiang] Merge remote-tracking branch 'upstream/master'
98804c9 [Shuo Xiang] fix bug in topBykey and update test

(cherry picked from commit 775e6f9909d4495cbc11c377508b43482d782742)
Signed-off-by: Xiangrui Meng 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b6bc8a2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b6bc8a2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b6bc8a2

Branch: refs/heads/branch-1.4
Commit: 0b6bc8a2394a26203d7be9ab53cc9dc5970d41a6
Parents: 2a42d2d
Author: Shuo Xiang 
Authored: Sun May 17 21:16:52 2015 -0700
Committer: Xiangrui Meng 
Committed: Sun May 17 21:16:59 2015 -0700

--
 .../org/apache/spark/mllib/classification/LogisticRegression.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0b6bc8a2/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index bd2e907..2df4d21 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -163,7 +163,7 @@ class LogisticRegressionModel (
   override protected def formatVersion: String = "1.0"
 
   override def toString: String = {
-s"${super.toString}, numClasses = ${numClasses}, threshold = 
${threshold.get}"
+s"${super.toString}, numClasses = ${numClasses}, threshold = 
${threshold.getOrElse("None")}"
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org