spark git commit: [SPARK-11522][SQL] input_file_name() returns "" for external tables

2015-11-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 a0f9cd77a -> c37ed52ec


[SPARK-11522][SQL] input_file_name() returns "" for external tables

When computing partition for non-parquet relation, `HadoopRDD.compute` is used. 
but it does not set the thread local variable `inputFileName` in 
`NewSqlHadoopRDD`, like `NewSqlHadoopRDD.compute` does.. Yet, when getting the 
`inputFileName`, `NewSqlHadoopRDD.inputFileName` is exptected, which is empty 
now.
Adding the setting inputFileName in HadoopRDD.compute resolves this issue.

Author: xin Wu 

Closes #9542 from xwu0226/SPARK-11522.

(cherry picked from commit 0e79604aed116bdcb40e03553a2d103b5b1cdbae)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c37ed52e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c37ed52e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c37ed52e

Branch: refs/heads/branch-1.6
Commit: c37ed52ecd38a400f99cc82f07440e455b772db2
Parents: a0f9cd7
Author: xin Wu 
Authored: Mon Nov 16 08:10:48 2015 -0800
Committer: Yin Huai 
Committed: Mon Nov 16 08:10:59 2015 -0800

--
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |  7 ++
 .../spark/sql/hive/execution/HiveUDFSuite.scala | 93 +++-
 2 files changed, 98 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 0453614..7db5834 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -213,6 +213,12 @@ class HadoopRDD[K, V](
 
   val inputMetrics = 
context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
 
+  // Sets the thread local variable for the file's name
+  split.inputSplit.value match {
+case fs: FileSplit => 
SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
+case _ => SqlNewHadoopRDD.unsetInputFileName()
+  }
+
   // Find a function that will return the FileSystem bytes read by this 
thread. Do this before
   // creating RecordReader, because RecordReader's constructor might read 
some bytes
   val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
@@ -250,6 +256,7 @@ class HadoopRDD[K, V](
 
   override def close() {
 if (reader != null) {
+  SqlNewHadoopRDD.unsetInputFileName()
   // Close the reader and release it. Note: it's very important that 
we don't close the
   // reader more than once, since that exposes us to MAPREDUCE-5918 
when running against
   // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to 
non-deterministic

http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 5ab477e..9deb1a6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution
 
-import java.io.{DataInput, DataOutput}
+import java.io.{PrintWriter, File, DataInput, DataOutput}
 import java.util.{ArrayList, Arrays, Properties}
 
 import org.apache.hadoop.conf.Configuration
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn
 import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, 
ObjectInspectorFactory}
 import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats}
 import org.apache.hadoop.io.Writable
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.util.Utils
@@ -44,7 +45,7 @@ case class ListStringCaseClass(l: Seq[String])
 /**
  * A test suite for Hive custom UDFs.
  */
-class HiveUDFSuite extends QueryTest with TestHiveSingleton {
+class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
 
   import hiveContext.{udf, sql}
   import hiveContext.implicits._
@@ -348,6 +349,94 @@ class HiveUDFSuite extends QueryTest with 
TestHiveSingleton {
 
 sqlContext.dropTempTable("testUDF")
   }
+
+  test("SPARK-11522 

spark git commit: [SPARK-11044][SQL] Parquet writer version fixed as version1

2015-11-16 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 42de5253f -> 7f8eb3bf6


[SPARK-11044][SQL] Parquet writer version fixed as version1

https://issues.apache.org/jira/browse/SPARK-11044

Spark writes a parquet file only with writer version1 ignoring the writer 
version given by user.

So, in this PR, it keeps the writer version if given or sets version1 as 
default.

Author: hyukjinkwon 
Author: HyukjinKwon 

Closes #9060 from HyukjinKwon/SPARK-11044.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f8eb3bf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f8eb3bf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f8eb3bf

Branch: refs/heads/master
Commit: 7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618
Parents: 42de525
Author: hyukjinkwon 
Authored: Mon Nov 16 21:30:10 2015 +0800
Committer: Cheng Lian 
Committed: Mon Nov 16 21:30:10 2015 +0800

--
 .../parquet/CatalystWriteSupport.scala  |  2 +-
 .../datasources/parquet/ParquetIOSuite.scala| 34 
 2 files changed, 35 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f8eb3bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index 483363d..6862dea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
   def setSchema(schema: StructType, configuration: Configuration): Unit = {
 schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
 configuration.set(SPARK_ROW_SCHEMA, schema.json)
-configuration.set(
+configuration.setIfUnset(
   ParquetOutputFormat.WRITER_VERSION,
   ParquetProperties.WriterVersion.PARQUET_1_0.toString)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/7f8eb3bf/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 78df363..2aa5dca 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.Collections
 
+import org.apache.parquet.column.{Encoding, ParquetProperties}
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
@@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 }
   }
 
+  test("SPARK-11044 Parquet writer version fixed as version1 ") {
+// For dictionary encoding, Parquet changes the encoding types according 
to its writer
+// version. So, this test checks one of the encoding types in order to 
ensure that
+// the file is written with writer version2.
+withTempPath { dir =>
+  val clonedConf = new Configuration(hadoopConfiguration)
+  try {
+// Write a Parquet file with writer version2.
+hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
+  ParquetProperties.WriterVersion.PARQUET_2_0.toString)
+
+// By default, dictionary encoding is enabled from Parquet 1.2.0 but
+// it is enabled just in case.
+hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, 
true)
+val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
+  .coalesce(1).write.mode("overwrite").parquet(path)
+
+val blockMetadata = readFooter(new Path(path), 
hadoopConfiguration).getBlocks.asScala.head
+val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+
+// If the file is written with version2, this should include
+// Encoding.RLE_DICTIONARY type. For version1, it is 
Encoding.PLAIN_DICTIONARY
+
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+  } 

spark git commit: [SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 c37ed52ec -> 949c9b7c6


[SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds

code snippet to reproduce it:
```
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
val t = Timestamp.valueOf("1900-06-11 12:14:50.789")
val us = fromJavaTimestamp(t)
assert(getSeconds(us) === t.getSeconds)
```

it will be good to add a regression test for it, but the reproducing code need 
to change the default timezone, and even we change it back, the `lazy val 
defaultTimeZone` in `DataTimeUtils` is fixed.

Author: Wenchen Fan 

Closes #9728 from cloud-fan/seconds.

(cherry picked from commit 06f1fdba6d1425afddfc1d45a20dbe9bede15e7a)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/949c9b7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/949c9b7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/949c9b7c

Branch: refs/heads/branch-1.6
Commit: 949c9b7c660c37bbe543adb260380afb1258089e
Parents: c37ed52
Author: Wenchen Fan 
Authored: Mon Nov 16 08:58:40 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 08:58:50 2015 -0800

--
 .../spark/sql/catalyst/util/DateTimeUtils.scala   | 14 --
 .../spark/sql/catalyst/util/DateTimeUtilsSuite.scala  |  2 +-
 2 files changed, 9 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/949c9b7c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index deff8a5..8fb3f41 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -395,16 +395,19 @@ object DateTimeUtils {
   /**
* Returns the microseconds since year zero (-17999) from microseconds since 
epoch.
*/
-  def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = {
+  private def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = {
 microsec + toYearZero * MICROS_PER_DAY
   }
 
+  private def localTimestamp(microsec: SQLTimestamp): SQLTimestamp = {
+absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) 
* 1000L
+  }
+
   /**
* Returns the hour value of a given timestamp value. The timestamp is 
expressed in microseconds.
*/
   def getHours(microsec: SQLTimestamp): Int = {
-val localTs = absoluteMicroSecond(microsec) + 
defaultTimeZone.getOffset(microsec / 1000) * 1000L
-((localTs / MICROS_PER_SECOND / 3600) % 24).toInt
+((localTimestamp(microsec) / MICROS_PER_SECOND / 3600) % 24).toInt
   }
 
   /**
@@ -412,8 +415,7 @@ object DateTimeUtils {
* microseconds.
*/
   def getMinutes(microsec: SQLTimestamp): Int = {
-val localTs = absoluteMicroSecond(microsec) + 
defaultTimeZone.getOffset(microsec / 1000) * 1000L
-((localTs / MICROS_PER_SECOND / 60) % 60).toInt
+((localTimestamp(microsec) / MICROS_PER_SECOND / 60) % 60).toInt
   }
 
   /**
@@ -421,7 +423,7 @@ object DateTimeUtils {
* microseconds.
*/
   def getSeconds(microsec: SQLTimestamp): Int = {
-((absoluteMicroSecond(microsec) / MICROS_PER_SECOND) % 60).toInt
+((localTimestamp(microsec) / MICROS_PER_SECOND) % 60).toInt
   }
 
   private[this] def isLeapYear(year: Int): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/949c9b7c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 64d15e6..60d4542 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -358,7 +358,7 @@ class DateTimeUtilsSuite extends SparkFunSuite {
 assert(getSeconds(c.getTimeInMillis * 1000) === 9)
   }
 
-  test("hours / miniute / seconds") {
+  test("hours / minutes / seconds") {
 Seq(Timestamp.valueOf("2015-06-11 10:12:35.789"),
   Timestamp.valueOf("2015-06-11 20:13:40.789"),
   Timestamp.valueOf("1900-06-11 12:14:50.789"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For 

spark git commit: [SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 0e79604ae -> 06f1fdba6


[SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds

code snippet to reproduce it:
```
TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai"))
val t = Timestamp.valueOf("1900-06-11 12:14:50.789")
val us = fromJavaTimestamp(t)
assert(getSeconds(us) === t.getSeconds)
```

it will be good to add a regression test for it, but the reproducing code need 
to change the default timezone, and even we change it back, the `lazy val 
defaultTimeZone` in `DataTimeUtils` is fixed.

Author: Wenchen Fan 

Closes #9728 from cloud-fan/seconds.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06f1fdba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06f1fdba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06f1fdba

Branch: refs/heads/master
Commit: 06f1fdba6d1425afddfc1d45a20dbe9bede15e7a
Parents: 0e79604
Author: Wenchen Fan 
Authored: Mon Nov 16 08:58:40 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 08:58:40 2015 -0800

--
 .../spark/sql/catalyst/util/DateTimeUtils.scala   | 14 --
 .../spark/sql/catalyst/util/DateTimeUtilsSuite.scala  |  2 +-
 2 files changed, 9 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/06f1fdba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index deff8a5..8fb3f41 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -395,16 +395,19 @@ object DateTimeUtils {
   /**
* Returns the microseconds since year zero (-17999) from microseconds since 
epoch.
*/
-  def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = {
+  private def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = {
 microsec + toYearZero * MICROS_PER_DAY
   }
 
+  private def localTimestamp(microsec: SQLTimestamp): SQLTimestamp = {
+absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) 
* 1000L
+  }
+
   /**
* Returns the hour value of a given timestamp value. The timestamp is 
expressed in microseconds.
*/
   def getHours(microsec: SQLTimestamp): Int = {
-val localTs = absoluteMicroSecond(microsec) + 
defaultTimeZone.getOffset(microsec / 1000) * 1000L
-((localTs / MICROS_PER_SECOND / 3600) % 24).toInt
+((localTimestamp(microsec) / MICROS_PER_SECOND / 3600) % 24).toInt
   }
 
   /**
@@ -412,8 +415,7 @@ object DateTimeUtils {
* microseconds.
*/
   def getMinutes(microsec: SQLTimestamp): Int = {
-val localTs = absoluteMicroSecond(microsec) + 
defaultTimeZone.getOffset(microsec / 1000) * 1000L
-((localTs / MICROS_PER_SECOND / 60) % 60).toInt
+((localTimestamp(microsec) / MICROS_PER_SECOND / 60) % 60).toInt
   }
 
   /**
@@ -421,7 +423,7 @@ object DateTimeUtils {
* microseconds.
*/
   def getSeconds(microsec: SQLTimestamp): Int = {
-((absoluteMicroSecond(microsec) / MICROS_PER_SECOND) % 60).toInt
+((localTimestamp(microsec) / MICROS_PER_SECOND) % 60).toInt
   }
 
   private[this] def isLeapYear(year: Int): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/06f1fdba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 64d15e6..60d4542 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -358,7 +358,7 @@ class DateTimeUtilsSuite extends SparkFunSuite {
 assert(getSeconds(c.getTimeInMillis * 1000) === 9)
   }
 
-  test("hours / miniute / seconds") {
+  test("hours / minutes / seconds") {
 Seq(Timestamp.valueOf("2015-06-11 10:12:35.789"),
   Timestamp.valueOf("2015-06-11 20:13:40.789"),
   Timestamp.valueOf("1900-06-11 12:14:50.789"),


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11692][SQL] Support for Parquet logical types, JSON and BSON (embedded types)

2015-11-16 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 7f8eb3bf6 -> e388b39d1


[SPARK-11692][SQL] Support for Parquet logical types, JSON and  BSON (embedded 
types)

Parquet supports some JSON and BSON datatypes. They are represented as binary 
for BSON and string (UTF-8) for JSON internally.

I searched a bit and found Apache drill also supports both in this way, 
[link](https://drill.apache.org/docs/parquet-format/).

Author: hyukjinkwon 
Author: Hyukjin Kwon 

Closes #9658 from HyukjinKwon/SPARK-11692.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e388b39d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e388b39d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e388b39d

Branch: refs/heads/master
Commit: e388b39d10fc269cdd3d630ea7d4ae80fd0efa97
Parents: 7f8eb3b
Author: hyukjinkwon 
Authored: Mon Nov 16 21:59:33 2015 +0800
Committer: Cheng Lian 
Committed: Mon Nov 16 21:59:33 2015 +0800

--
 .../parquet/CatalystSchemaConverter.scala   |  3 ++-
 .../datasources/parquet/ParquetIOSuite.scala| 25 
 2 files changed, 27 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e388b39d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index f28a18e..5f9f908 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -170,9 +170,10 @@ private[parquet] class CatalystSchemaConverter(
 
   case BINARY =>
 originalType match {
-  case UTF8 | ENUM => StringType
+  case UTF8 | ENUM | JSON => StringType
   case null if assumeBinaryIsString => StringType
   case null => BinaryType
+  case BSON => BinaryType
   case DECIMAL => makeDecimalType()
   case _ => illegalType()
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e388b39d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 2aa5dca..a148fac 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -259,6 +259,31 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 }
   }
 
+  test("SPARK-11692 Support for Parquet logical types, JSON and BSON (embedded 
types)") {
+val parquetSchema = MessageTypeParser.parseMessageType(
+  """message root {
+|  required binary a(JSON);
+|  required binary b(BSON);
+|}
+  """.stripMargin)
+
+withTempPath { location =>
+  val extraMetadata = Map.empty[String, String].asJava
+  val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
+  val path = new Path(location.getCanonicalPath)
+  val footer = List(
+new Footer(path, new ParquetMetadata(fileMetadata, 
Collections.emptyList()))
+  ).asJava
+
+  ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, 
path, footer)
+
+  val jsonDataType = 
sqlContext.read.parquet(path.toString).schema(0).dataType
+  assert(jsonDataType === StringType)
+  val bsonDataType = 
sqlContext.read.parquet(path.toString).schema(1).dataType
+  assert(bsonDataType === BinaryType)
+}
+  }
+
   test("compression codec") {
 def compressionCodecFor(path: String, codecName: String): String = {
   val codecs = for {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-10181][SQL] Do kerberos login for credentials during hive client initialization

2015-11-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 bf79a171e -> 51fc152b7


[SPARK-10181][SQL] Do kerberos login for credentials during hive client 
initialization

On driver process start up, UserGroupInformation.loginUserFromKeytab is called 
with the principal and keytab passed in, and therefore static var 
UserGroupInfomation,loginUser is set to that principal with kerberos 
credentials saved in its private credential set, and all threads within the 
driver process are supposed to see and use this login credentials to 
authenticate with Hive and Hadoop. However, because of IsolatedClientLoader, 
UserGroupInformation class is not shared for hive metastore clients, and 
instead it is loaded separately and of course not able to see the prepared 
kerberos login credentials in the main thread.

The first proposed fix would cause other classloader conflict errors, and is 
not an appropriate solution. This new change does kerberos login during hive 
client initialization, which will make credentials ready for the particular 
hive client instance.

 yhuai Please take a look and let me know. If you are not the right person to 
talk to, could you point me to someone responsible for this?

Author: Yu Gao 
Author: gaoyu 
Author: Yu Gao 

Closes #9272 from yolandagao/master.

(cherry picked from commit 72c1d68b4ab6acb3f85971e10947caabb4bd846d)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51fc152b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51fc152b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51fc152b

Branch: refs/heads/branch-1.5
Commit: 51fc152b7e194282940eab29fe0069edef8a67a5
Parents: bf79a17
Author: Yu Gao 
Authored: Sun Nov 15 14:53:59 2015 -0800
Committer: Yin Huai 
Committed: Mon Nov 16 10:29:39 2015 -0800

--
 .../org/apache/spark/deploy/SparkSubmit.scala   | 17 +++---
 .../spark/sql/hive/client/ClientWrapper.scala   | 24 +++-
 2 files changed, 37 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/51fc152b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index fefbba9..dc555cb 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -39,7 +39,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
 import org.apache.ivy.plugins.repository.file.FileRepository
 import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, 
IBiblioResolver}
 
-import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
+import org.apache.spark.{SparkException, SparkUserAppException, SPARK_VERSION}
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.deploy.rest._
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, 
Utils}
@@ -521,8 +521,19 @@ object SparkSubmit {
 sysProps.put("spark.yarn.isPython", "true")
   }
   if (args.principal != null) {
-require(args.keytab != null, "Keytab must be specified when the keytab 
is specified")
-UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+require(args.keytab != null, "Keytab must be specified when principal 
is specified")
+if (!new File(args.keytab).exists()) {
+  throw new SparkException(s"Keytab file: ${args.keytab} does not 
exist")
+} else {
+  // Add keytab and principal configurations in sysProps to make them 
available
+  // for later use; e.g. in spark sql, the isolated class loader used 
to talk
+  // to HiveMetastore will use these settings. They will be set as 
Java system
+  // properties and then loaded by SparkConf
+  sysProps.put("spark.yarn.keytab", args.keytab)
+  sysProps.put("spark.yarn.principal", args.principal)
+
+  UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
+}
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51fc152b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index f45747a..436f2e5 100644
--- 

spark git commit: [SPARK-11044][SQL] Parquet writer version fixed as version1

2015-11-16 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1887fa228 -> f14fb291d


[SPARK-11044][SQL] Parquet writer version fixed as version1

https://issues.apache.org/jira/browse/SPARK-11044

Spark writes a parquet file only with writer version1 ignoring the writer 
version given by user.

So, in this PR, it keeps the writer version if given or sets version1 as 
default.

Author: hyukjinkwon 
Author: HyukjinKwon 

Closes #9060 from HyukjinKwon/SPARK-11044.

(cherry picked from commit 7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f14fb291
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f14fb291
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f14fb291

Branch: refs/heads/branch-1.6
Commit: f14fb291d822720d8b578db0bdb656fb6c9ce590
Parents: 1887fa2
Author: hyukjinkwon 
Authored: Mon Nov 16 21:30:10 2015 +0800
Committer: Cheng Lian 
Committed: Tue Nov 17 03:09:33 2015 +0800

--
 .../parquet/CatalystWriteSupport.scala  |  2 +-
 .../datasources/parquet/ParquetIOSuite.scala| 34 
 2 files changed, 35 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f14fb291/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index 483363d..6862dea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
   def setSchema(schema: StructType, configuration: Configuration): Unit = {
 schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
 configuration.set(SPARK_ROW_SCHEMA, schema.json)
-configuration.set(
+configuration.setIfUnset(
   ParquetOutputFormat.WRITER_VERSION,
   ParquetProperties.WriterVersion.PARQUET_1_0.toString)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f14fb291/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 78df363..2aa5dca 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.Collections
 
+import org.apache.parquet.column.{Encoding, ParquetProperties}
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
@@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 }
   }
 
+  test("SPARK-11044 Parquet writer version fixed as version1 ") {
+// For dictionary encoding, Parquet changes the encoding types according 
to its writer
+// version. So, this test checks one of the encoding types in order to 
ensure that
+// the file is written with writer version2.
+withTempPath { dir =>
+  val clonedConf = new Configuration(hadoopConfiguration)
+  try {
+// Write a Parquet file with writer version2.
+hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
+  ParquetProperties.WriterVersion.PARQUET_2_0.toString)
+
+// By default, dictionary encoding is enabled from Parquet 1.2.0 but
+// it is enabled just in case.
+hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, 
true)
+val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
+  .coalesce(1).write.mode("overwrite").parquet(path)
+
+val blockMetadata = readFooter(new Path(path), 
hadoopConfiguration).getBlocks.asScala.head
+val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+
+// If the file is written with version2, this should include
+// Encoding.RLE_DICTIONARY type. For version1, it is 

spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 38fe092ff -> fbe65c592


[SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

These 2 are very similar, we can consolidate them into one.

Also add tests for it and fix a bug.

Author: Wenchen Fan 

Closes #9729 from cloud-fan/tuple.

(cherry picked from commit b1a9662623951079e80bd7498e064c4cae4977e9)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe65c59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe65c59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe65c59

Branch: refs/heads/branch-1.6
Commit: fbe65c5924d2f5f4789bf54a1da0a7b6bbf1eb42
Parents: 38fe092
Author: Wenchen Fan 
Authored: Mon Nov 16 12:45:34 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 12:46:26 2015 -0800

--
 .../scala/org/apache/spark/sql/Encoder.scala|  95 -
 .../catalyst/encoders/ExpressionEncoder.scala   | 104 ++-
 .../catalyst/encoders/ProductEncoderSuite.scala |  29 ++
 3 files changed, 108 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fbe65c59/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 5f619d6..c8b017e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
+import org.apache.spark.sql.types.StructType
 
 /**
  * Used to convert a JVM object of type `T` to and from the internal Spark SQL 
representation.
@@ -49,83 +47,34 @@ object Encoders {
   def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
   def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
 
-  def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = 
{
-tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2)]]
+  def tuple[T1, T2](
+  e1: Encoder[T1],
+  e2: Encoder[T2]): Encoder[(T1, T2)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
   }
 
   def tuple[T1, T2, T3](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
-tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
   }
 
   def tuple[T1, T2, T3, T4](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
-tuple(Seq(enc1, enc2, enc3, 
enc4).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3],
+  e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), 
encoderFor(e4))
   }
 
   def tuple[T1, T2, T3, T4, T5](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4],
-  enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
-tuple(Seq(enc1, enc2, enc3, enc4, 
enc5).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]]
-  }
-
-  private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] 
= {
-assert(encoders.length > 1)
-// make sure all encoders are resolved, i.e. `Attribute` has been resolved 
to `BoundReference`.
-
assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty))
-
-val schema = StructType(encoders.zipWithIndex.map {
-  case (e, i) => StructField(s"_${i + 1}", if (e.flat) 
e.schema.head.dataType else e.schema)
-})
-
-val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
-
-val extractExpressions = encoders.map {
-  case e if e.flat => 

spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fbe65c592 -> 90d71bff0


[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

…ishable

Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply

Author: Zee Chen 

Closes #9679 from zeocio/spark-11390.

(cherry picked from commit 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d71bff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d71bff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d71bff

Branch: refs/heads/branch-1.6
Commit: 90d71bff0c583830aa3fd96b1dd3607f0cb0cbee
Parents: fbe65c5
Author: Zee Chen 
Authored: Mon Nov 16 14:21:28 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 14:21:41 2015 -0800

--
 .../org/apache/spark/sql/execution/ExistingRDD.scala  |  6 --
 .../execution/datasources/DataSourceStrategy.scala|  6 --
 .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++
 3 files changed, 22 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 8b41d3d..62620ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
   def createFromDataSource(
   output: Seq[Attribute],
   rdd: RDD[InternalRow],
-  relation: BaseRelation): PhysicalRDD = {
-PhysicalRDD(output, rdd, relation.toString, 
relation.isInstanceOf[HadoopFsRelation])
+  relation: BaseRelation,
+  extraInformation: String = ""): PhysicalRDD = {
+PhysicalRDD(output, rdd, relation.toString + extraInformation,
+  relation.isInstanceOf[HadoopFsRelation])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bbbfa7..544d5ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", 
"] ")
+
 if (projects.map(_.toAttribute) == projects &&
 projectSet.size == projects.size &&
 filterSet.subsetOf(projectSet)) {
@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 projects.map(_.toAttribute),
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
 } else {
   // Don't request columns that are only referenced by pushed filters.
@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 requestedColumns,
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   execution.Project(
 projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be53ec3..dfec139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ 

spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master b1a966262 -> 985b38dd2


[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

…ishable

Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply

Author: Zee Chen 

Closes #9679 from zeocio/spark-11390.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/985b38dd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/985b38dd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/985b38dd

Branch: refs/heads/master
Commit: 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181
Parents: b1a9662
Author: Zee Chen 
Authored: Mon Nov 16 14:21:28 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 14:21:28 2015 -0800

--
 .../org/apache/spark/sql/execution/ExistingRDD.scala  |  6 --
 .../execution/datasources/DataSourceStrategy.scala|  6 --
 .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++
 3 files changed, 22 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 8b41d3d..62620ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
   def createFromDataSource(
   output: Seq[Attribute],
   rdd: RDD[InternalRow],
-  relation: BaseRelation): PhysicalRDD = {
-PhysicalRDD(output, rdd, relation.toString, 
relation.isInstanceOf[HadoopFsRelation])
+  relation: BaseRelation,
+  extraInformation: String = ""): PhysicalRDD = {
+PhysicalRDD(output, rdd, relation.toString + extraInformation,
+  relation.isInstanceOf[HadoopFsRelation])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 9bbbfa7..544d5ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
 // `Filter`s or cannot be handled by `relation`.
 val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
 
+val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", 
"] ")
+
 if (projects.map(_.toAttribute) == projects &&
 projectSet.size == projects.size &&
 filterSet.subsetOf(projectSet)) {
@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 projects.map(_.toAttribute),
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
 } else {
   // Don't request columns that are only referenced by pushed filters.
@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
   val scan = execution.PhysicalRDD.createFromDataSource(
 requestedColumns,
 scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
-relation.relation)
+relation.relation, pushedFiltersString)
   execution.Project(
 projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index be53ec3..dfec139 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext {
 }
   }

spark git commit: Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage"

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 90d71bff0 -> 64439f7d6


Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of 
RoaringBitmap to reduce memory usage"

This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64439f7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64439f7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64439f7d

Branch: refs/heads/branch-1.6
Commit: 64439f7d61edfbaa5513ddcf3f9dc86fa905aef2
Parents: 90d71bf
Author: Davies Liu 
Authored: Mon Nov 16 14:50:38 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 14:51:25 2015 -0800

--
 core/pom.xml|  4 ++
 .../org/apache/spark/scheduler/MapStatus.scala  | 13 +++---
 .../spark/serializer/KryoSerializer.scala   | 10 +++-
 .../apache/spark/util/collection/BitSet.scala   | 28 ++-
 .../spark/serializer/KryoSerializerSuite.scala  |  6 +++
 .../spark/util/collection/BitSetSuite.scala | 49 
 pom.xml |  5 ++
 7 files changed, 33 insertions(+), 82 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 7e1205a..37e3f16 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -178,6 +178,10 @@
   lz4
 
 
+  org.roaringbitmap
+  RoaringBitmap
+
+
   commons-net
   commons-net
 

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1..1efce12 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import org.roaringbitmap.RoaringBitmap
+
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.Utils
 
 /**
@@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
-private[this] var emptyBlocks: BitSet,
+private[this] var emptyBlocks: RoaringBitmap,
 private[this] var avgSize: Long)
   extends MapStatus with Externalizable {
 
@@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
-if (emptyBlocks.get(reduceId)) {
+if (emptyBlocks.contains(reduceId)) {
   0
 } else {
   avgSize
@@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
 loc = BlockManagerId(in)
-emptyBlocks = new BitSet
+emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
   }
@@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
 // From a compression standpoint, it shouldn't matter whether we track 
empty or non-empty
 // blocks. From a performance standpoint, we benefit from tracking empty 
blocks because
 // we expect that there will be far fewer of them, so we will perform 
fewer bitmap insertions.
+val emptyBlocks = new RoaringBitmap()
 val totalNumBlocks = uncompressedSizes.length
-val emptyBlocks = new BitSet(totalNumBlocks)
 while (i < totalNumBlocks) {
   var size = uncompressedSizes(i)
   if (size > 0) {
 numNonEmptyBlocks += 1
 totalSize += size
   } else {
-emptyBlocks.set(i)
+emptyBlocks.add(i)
   }
   i += 1
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f..c5195c1 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => 

spark git commit: Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage"

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 985b38dd2 -> 3c025087b


Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of 
RoaringBitmap to reduce memory usage"

This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c025087
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c025087
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c025087

Branch: refs/heads/master
Commit: 3c025087b58f475a9bcb5c8f4b2b2df804915b2b
Parents: 985b38d
Author: Davies Liu 
Authored: Mon Nov 16 14:50:38 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 14:50:38 2015 -0800

--
 core/pom.xml|  4 ++
 .../org/apache/spark/scheduler/MapStatus.scala  | 13 +++---
 .../spark/serializer/KryoSerializer.scala   | 10 +++-
 .../apache/spark/util/collection/BitSet.scala   | 28 ++-
 .../spark/serializer/KryoSerializerSuite.scala  |  6 +++
 .../spark/util/collection/BitSetSuite.scala | 49 
 pom.xml |  5 ++
 7 files changed, 33 insertions(+), 82 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 7e1205a..37e3f16 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -178,6 +178,10 @@
   lz4
 
 
+  org.roaringbitmap
+  RoaringBitmap
+
+
   commons-net
   commons-net
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 180c8d1..1efce12 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.{Externalizable, ObjectInput, ObjectOutput}
 
+import org.roaringbitmap.RoaringBitmap
+
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.collection.BitSet
 import org.apache.spark.util.Utils
 
 /**
@@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus(
 private[spark] class HighlyCompressedMapStatus private (
 private[this] var loc: BlockManagerId,
 private[this] var numNonEmptyBlocks: Int,
-private[this] var emptyBlocks: BitSet,
+private[this] var emptyBlocks: RoaringBitmap,
 private[this] var avgSize: Long)
   extends MapStatus with Externalizable {
 
@@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private (
   override def location: BlockManagerId = loc
 
   override def getSizeForBlock(reduceId: Int): Long = {
-if (emptyBlocks.get(reduceId)) {
+if (emptyBlocks.contains(reduceId)) {
   0
 } else {
   avgSize
@@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private (
 
   override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
 loc = BlockManagerId(in)
-emptyBlocks = new BitSet
+emptyBlocks = new RoaringBitmap()
 emptyBlocks.readExternal(in)
 avgSize = in.readLong()
   }
@@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus {
 // From a compression standpoint, it shouldn't matter whether we track 
empty or non-empty
 // blocks. From a performance standpoint, we benefit from tracking empty 
blocks because
 // we expect that there will be far fewer of them, so we will perform 
fewer bitmap insertions.
+val emptyBlocks = new RoaringBitmap()
 val totalNumBlocks = uncompressedSizes.length
-val emptyBlocks = new BitSet(totalNumBlocks)
 while (i < totalNumBlocks) {
   var size = uncompressedSizes(i)
   if (size > 0) {
 numNonEmptyBlocks += 1
 totalSize += size
   } else {
-emptyBlocks.set(i)
+emptyBlocks.add(i)
   }
   i += 1
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index bc51d4f..c5195c1 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput, 

spark git commit: [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default

2015-11-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master b0c3fd34e -> de5e531d3


[SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default

Using batching on the driver for the WriteAheadLog should be an improvement for 
all environments and use cases. Users will be able to scale to much higher 
number of receivers with the BatchedWriteAheadLog. Therefore we should turn it 
on by default, and QA it in the QA period.

I've also added some tests to make sure the default configurations are correct 
regarding recent additions:
 - batching on by default
 - closeFileAfterWrite off by default
 - parallelRecovery off by default

Author: Burak Yavuz 

Closes #9695 from brkyvz/enable-batch-wal.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de5e531d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de5e531d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de5e531d

Branch: refs/heads/master
Commit: de5e531d337075fd849437e88846873bca8685e6
Parents: b0c3fd3
Author: Burak Yavuz 
Authored: Mon Nov 16 11:21:17 2015 -0800
Committer: Tathagata Das 
Committed: Mon Nov 16 11:21:17 2015 -0800

--
 .../streaming/util/WriteAheadLogUtils.scala |  2 +-
 .../spark/streaming/JavaWriteAheadLogSuite.java |  1 +
 .../streaming/ReceivedBlockTrackerSuite.scala   |  9 ++--
 .../streaming/util/WriteAheadLogSuite.scala | 24 +++-
 .../util/WriteAheadLogUtilsSuite.scala  | 19 +---
 5 files changed, 48 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index 731a369..7f9e2c9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
   }
 
   def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
-isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = 
false)
+isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = 
true)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
--
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index 175b8a4..09b5f8e 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
   public void testCustomWAL() {
 SparkConf conf = new SparkConf();
 conf.set("spark.streaming.driver.writeAheadLog.class", 
JavaWriteAheadLogSuite.class.getName());
+conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
 WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, 
null);
 
 String data1 = "data1";

http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 7db17ab..081f5a1 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
 : Seq[ReceivedBlockTrackerLogEvent] = {
 logFiles.flatMap {
   file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
-}.map { byteBuffer =>
-  Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
+}.flatMap { byteBuffer =>
+  val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, 
isDriver = true)) {
+
Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
+  } else {
+Array(byteBuffer)
+  }
+  validBuffer.map(b => 

spark git commit: [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default

2015-11-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 f14fb291d -> 38673d7e6


[SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default

Using batching on the driver for the WriteAheadLog should be an improvement for 
all environments and use cases. Users will be able to scale to much higher 
number of receivers with the BatchedWriteAheadLog. Therefore we should turn it 
on by default, and QA it in the QA period.

I've also added some tests to make sure the default configurations are correct 
regarding recent additions:
 - batching on by default
 - closeFileAfterWrite off by default
 - parallelRecovery off by default

Author: Burak Yavuz 

Closes #9695 from brkyvz/enable-batch-wal.

(cherry picked from commit de5e531d337075fd849437e88846873bca8685e6)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38673d7e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38673d7e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38673d7e

Branch: refs/heads/branch-1.6
Commit: 38673d7e6358622f240d7331b061cadb96f8409f
Parents: f14fb29
Author: Burak Yavuz 
Authored: Mon Nov 16 11:21:17 2015 -0800
Committer: Tathagata Das 
Committed: Mon Nov 16 11:21:27 2015 -0800

--
 .../streaming/util/WriteAheadLogUtils.scala |  2 +-
 .../spark/streaming/JavaWriteAheadLogSuite.java |  1 +
 .../streaming/ReceivedBlockTrackerSuite.scala   |  9 ++--
 .../streaming/util/WriteAheadLogSuite.scala | 24 +++-
 .../util/WriteAheadLogUtilsSuite.scala  | 19 +---
 5 files changed, 48 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
index 731a369..7f9e2c9 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala
@@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
   }
 
   def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
-isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = 
false)
+isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = 
true)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
--
diff --git 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
index 175b8a4..09b5f8e 100644
--- 
a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
+++ 
b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java
@@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog {
   public void testCustomWAL() {
 SparkConf conf = new SparkConf();
 conf.set("spark.streaming.driver.writeAheadLog.class", 
JavaWriteAheadLogSuite.class.getName());
+conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
 WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, 
null);
 
 String data1 = "data1";

http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 7db17ab..081f5a1 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
 : Seq[ReceivedBlockTrackerLogEvent] = {
 logFiles.flatMap {
   file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
-}.map { byteBuffer =>
-  Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
+}.flatMap { byteBuffer =>
+  val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, 
isDriver = true)) {
+

spark git commit: [SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue

2015-11-16 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master ace0db471 -> 24477d270


[SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue

Currently if dynamic allocation is enabled, explicitly killing executor will 
not get response, so the executor metadata is wrong in driver side. Which will 
make dynamic allocation on Yarn fail to work.

The problem is  `disableExecutor` returns false for pending killing executors 
when `onDisconnect` is detected, so no further implementation is done.

One solution is to bypass these explicitly killed executors to use 
`super.onDisconnect` to remove executor. This is simple.

Another solution is still querying the loss reason for these explicitly kill 
executors. Since executor may get killed and informed in the same AM-RM 
communication, so current way of adding pending loss reason request is not 
worked (container complete is already processed), here we should store this 
loss reason for later query.

Here this PR chooses solution 2.

Please help to review. vanzin I think this part is changed by you previously, 
would you please help to review? Thanks a lot.

Author: jerryshao 

Closes #9684 from jerryshao/SPARK-11718.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24477d27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24477d27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24477d27

Branch: refs/heads/master
Commit: 24477d2705bcf2a851acc241deb8376c5450dc73
Parents: ace0db4
Author: jerryshao 
Authored: Mon Nov 16 11:43:18 2015 -0800
Committer: Marcelo Vanzin 
Committed: Mon Nov 16 11:43:18 2015 -0800

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  1 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 ++--
 .../spark/deploy/yarn/YarnAllocator.scala   | 30 
 3 files changed, 29 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 43d7d80..5f13669 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl(
  // If the host mapping still exists, it means we don't know the 
loss reason for the
  // executor. So call removeExecutor() to update tasks running on 
that executor when
  // the real loss reason is finally known.
+ logError(s"Actual reason for lost executor $executorId: 
${reason.message}")
  removeExecutor(executorId, reason)
 
case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71d98f..3373caf 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -269,7 +269,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
  * Stop making resource offers for the given executor. The executor is 
marked as lost with
  * the loss reason still pending.
  *
- * @return Whether executor was alive.
+ * @return Whether executor should be disabled
  */
 protected def disableExecutor(executorId: String): Boolean = {
   val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
@@ -277,7 +277,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   executorsPendingLossReason += executorId
   true
 } else {
-  false
+  // Returns true for explicitly killed executors, we also need to get 
pending loss reasons;
+  // For others return false.
+  executorsPendingToRemove.contains(executorId)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
--
diff --git 

spark git commit: [SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue

2015-11-16 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 c83177d30 -> 38fe092ff


[SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue

Currently if dynamic allocation is enabled, explicitly killing executor will 
not get response, so the executor metadata is wrong in driver side. Which will 
make dynamic allocation on Yarn fail to work.

The problem is  `disableExecutor` returns false for pending killing executors 
when `onDisconnect` is detected, so no further implementation is done.

One solution is to bypass these explicitly killed executors to use 
`super.onDisconnect` to remove executor. This is simple.

Another solution is still querying the loss reason for these explicitly kill 
executors. Since executor may get killed and informed in the same AM-RM 
communication, so current way of adding pending loss reason request is not 
worked (container complete is already processed), here we should store this 
loss reason for later query.

Here this PR chooses solution 2.

Please help to review. vanzin I think this part is changed by you previously, 
would you please help to review? Thanks a lot.

Author: jerryshao 

Closes #9684 from jerryshao/SPARK-11718.

(cherry picked from commit 24477d2705bcf2a851acc241deb8376c5450dc73)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38fe092f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38fe092f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38fe092f

Branch: refs/heads/branch-1.6
Commit: 38fe092ff3a1e86088a22adc934a4b4e269b0a47
Parents: c83177d
Author: jerryshao 
Authored: Mon Nov 16 11:43:18 2015 -0800
Committer: Marcelo Vanzin 
Committed: Mon Nov 16 11:43:39 2015 -0800

--
 .../spark/scheduler/TaskSchedulerImpl.scala |  1 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |  6 ++--
 .../spark/deploy/yarn/YarnAllocator.scala   | 30 
 3 files changed, 29 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 43d7d80..5f13669 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl(
  // If the host mapping still exists, it means we don't know the 
loss reason for the
  // executor. So call removeExecutor() to update tasks running on 
that executor when
  // the real loss reason is finally known.
+ logError(s"Actual reason for lost executor $executorId: 
${reason.message}")
  removeExecutor(executorId, reason)
 
case None =>

http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f71d98f..3373caf 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -269,7 +269,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
  * Stop making resource offers for the given executor. The executor is 
marked as lost with
  * the loss reason still pending.
  *
- * @return Whether executor was alive.
+ * @return Whether executor should be disabled
  */
 protected def disableExecutor(executorId: String): Boolean = {
   val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
@@ -277,7 +277,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   executorsPendingLossReason += executorId
   true
 } else {
-  false
+  // Returns true for explicitly killed executors, we also need to get 
pending loss reasons;
+  // For others return false.
+  executorsPendingToRemove.contains(executorId)
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 24477d270 -> b1a966262


[SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`

These 2 are very similar, we can consolidate them into one.

Also add tests for it and fix a bug.

Author: Wenchen Fan 

Closes #9729 from cloud-fan/tuple.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1a96626
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1a96626
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1a96626

Branch: refs/heads/master
Commit: b1a9662623951079e80bd7498e064c4cae4977e9
Parents: 24477d2
Author: Wenchen Fan 
Authored: Mon Nov 16 12:45:34 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 12:45:34 2015 -0800

--
 .../scala/org/apache/spark/sql/Encoder.scala|  95 -
 .../catalyst/encoders/ExpressionEncoder.scala   | 104 ++-
 .../catalyst/encoders/ProductEncoderSuite.scala |  29 ++
 3 files changed, 108 insertions(+), 120 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1a96626/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 5f619d6..c8b017e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor}
+import org.apache.spark.sql.types.StructType
 
 /**
  * Used to convert a JVM object of type `T` to and from the internal Spark SQL 
representation.
@@ -49,83 +47,34 @@ object Encoders {
   def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true)
   def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true)
 
-  def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = 
{
-tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2)]]
+  def tuple[T1, T2](
+  e1: Encoder[T1],
+  e2: Encoder[T2]): Encoder[(T1, T2)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2))
   }
 
   def tuple[T1, T2, T3](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
-tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3]): Encoder[(T1, T2, T3)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3))
   }
 
   def tuple[T1, T2, T3, T4](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
-tuple(Seq(enc1, enc2, enc3, 
enc4).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]]
+  e1: Encoder[T1],
+  e2: Encoder[T2],
+  e3: Encoder[T3],
+  e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = {
+ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), 
encoderFor(e4))
   }
 
   def tuple[T1, T2, T3, T4, T5](
-  enc1: Encoder[T1],
-  enc2: Encoder[T2],
-  enc3: Encoder[T3],
-  enc4: Encoder[T4],
-  enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = {
-tuple(Seq(enc1, enc2, enc3, enc4, 
enc5).map(_.asInstanceOf[ExpressionEncoder[_]]))
-  .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]]
-  }
-
-  private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] 
= {
-assert(encoders.length > 1)
-// make sure all encoders are resolved, i.e. `Attribute` has been resolved 
to `BoundReference`.
-
assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty))
-
-val schema = StructType(encoders.zipWithIndex.map {
-  case (e, i) => StructField(s"_${i + 1}", if (e.flat) 
e.schema.head.dataType else e.schema)
-})
-
-val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
-
-val extractExpressions = encoders.map {
-  case e if e.flat => e.toRowExpressions.head
-  case other => CreateStruct(other.toRowExpressions)
-}.zipWithIndex.map { case (expr, index) =>
-  

spark git commit: [SPARK-6328][PYTHON] Python API for StreamingListener

2015-11-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 38673d7e6 -> c83177d30


[SPARK-6328][PYTHON] Python API for StreamingListener

Author: Daniel Jalova 

Closes #9186 from djalova/SPARK-6328.

(cherry picked from commit ace0db47141ffd457c2091751038fc291f6d5a8b)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c83177d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c83177d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c83177d3

Branch: refs/heads/branch-1.6
Commit: c83177d3045afccf07465c43caac74d980673f42
Parents: 38673d7
Author: Daniel Jalova 
Authored: Mon Nov 16 11:29:27 2015 -0800
Committer: Tathagata Das 
Committed: Mon Nov 16 11:29:36 2015 -0800

--
 python/pyspark/streaming/__init__.py|   3 +-
 python/pyspark/streaming/context.py |   8 ++
 python/pyspark/streaming/listener.py|  75 +++
 python/pyspark/streaming/tests.py   | 126 ++-
 .../api/java/JavaStreamingListener.scala|  76 +++
 5 files changed, 286 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c83177d3/python/pyspark/streaming/__init__.py
--
diff --git a/python/pyspark/streaming/__init__.py 
b/python/pyspark/streaming/__init__.py
index d2644a1..66e8f8e 100644
--- a/python/pyspark/streaming/__init__.py
+++ b/python/pyspark/streaming/__init__.py
@@ -17,5 +17,6 @@
 
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.dstream import DStream
+from pyspark.streaming.listener import StreamingListener
 
-__all__ = ['StreamingContext', 'DStream']
+__all__ = ['StreamingContext', 'DStream', 'StreamingListener']

http://git-wip-us.apache.org/repos/asf/spark/blob/c83177d3/python/pyspark/streaming/context.py
--
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 8be56c9..1388b6d 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -363,3 +363,11 @@ class StreamingContext(object):
 first = dstreams[0]
 jrest = [d._jdstream for d in dstreams[1:]]
 return DStream(self._jssc.union(first._jdstream, jrest), self, 
first._jrdd_deserializer)
+
+def addStreamingListener(self, streamingListener):
+"""
+Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] 
object for
+receiving system events related to streaming.
+"""
+self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(
+self._jvm.PythonStreamingListenerWrapper(streamingListener)))

http://git-wip-us.apache.org/repos/asf/spark/blob/c83177d3/python/pyspark/streaming/listener.py
--
diff --git a/python/pyspark/streaming/listener.py 
b/python/pyspark/streaming/listener.py
new file mode 100644
index 000..b830797
--- /dev/null
+++ b/python/pyspark/streaming/listener.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["StreamingListener"]
+
+
+class StreamingListener(object):
+
+def __init__(self):
+pass
+
+def onReceiverStarted(self, receiverStarted):
+"""
+Called when a receiver has been started
+"""
+pass
+
+def onReceiverError(self, receiverError):
+"""
+Called when a receiver has reported an error
+"""
+pass
+
+def onReceiverStopped(self, receiverStopped):
+"""
+Called when a receiver has been stopped
+"""
+pass
+
+def onBatchSubmitted(self, batchSubmitted):
+"""
+Called when a batch of jobs has been submitted for processing.
+"""
+pass
+
+def onBatchStarted(self, batchStarted):
+"""

spark git commit: [SPARK-6328][PYTHON] Python API for StreamingListener

2015-11-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master de5e531d3 -> ace0db471


[SPARK-6328][PYTHON] Python API for StreamingListener

Author: Daniel Jalova 

Closes #9186 from djalova/SPARK-6328.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ace0db47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ace0db47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ace0db47

Branch: refs/heads/master
Commit: ace0db47141ffd457c2091751038fc291f6d5a8b
Parents: de5e531
Author: Daniel Jalova 
Authored: Mon Nov 16 11:29:27 2015 -0800
Committer: Tathagata Das 
Committed: Mon Nov 16 11:29:27 2015 -0800

--
 python/pyspark/streaming/__init__.py|   3 +-
 python/pyspark/streaming/context.py |   8 ++
 python/pyspark/streaming/listener.py|  75 +++
 python/pyspark/streaming/tests.py   | 126 ++-
 .../api/java/JavaStreamingListener.scala|  76 +++
 5 files changed, 286 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/__init__.py
--
diff --git a/python/pyspark/streaming/__init__.py 
b/python/pyspark/streaming/__init__.py
index d2644a1..66e8f8e 100644
--- a/python/pyspark/streaming/__init__.py
+++ b/python/pyspark/streaming/__init__.py
@@ -17,5 +17,6 @@
 
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.dstream import DStream
+from pyspark.streaming.listener import StreamingListener
 
-__all__ = ['StreamingContext', 'DStream']
+__all__ = ['StreamingContext', 'DStream', 'StreamingListener']

http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/context.py
--
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 8be56c9..1388b6d 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -363,3 +363,11 @@ class StreamingContext(object):
 first = dstreams[0]
 jrest = [d._jdstream for d in dstreams[1:]]
 return DStream(self._jssc.union(first._jdstream, jrest), self, 
first._jrdd_deserializer)
+
+def addStreamingListener(self, streamingListener):
+"""
+Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] 
object for
+receiving system events related to streaming.
+"""
+self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(
+self._jvm.PythonStreamingListenerWrapper(streamingListener)))

http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/listener.py
--
diff --git a/python/pyspark/streaming/listener.py 
b/python/pyspark/streaming/listener.py
new file mode 100644
index 000..b830797
--- /dev/null
+++ b/python/pyspark/streaming/listener.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["StreamingListener"]
+
+
+class StreamingListener(object):
+
+def __init__(self):
+pass
+
+def onReceiverStarted(self, receiverStarted):
+"""
+Called when a receiver has been started
+"""
+pass
+
+def onReceiverError(self, receiverError):
+"""
+Called when a receiver has reported an error
+"""
+pass
+
+def onReceiverStopped(self, receiverStopped):
+"""
+Called when a receiver has been stopped
+"""
+pass
+
+def onBatchSubmitted(self, batchSubmitted):
+"""
+Called when a batch of jobs has been submitted for processing.
+"""
+pass
+
+def onBatchStarted(self, batchStarted):
+"""
+Called when processing of a batch of jobs has started.
+"""
+pass
+
+def onBatchCompleted(self, 

spark git commit: [SPARK-11625][SQL] add java test for typed aggregate

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 75ee12f09 -> fd14936be


[SPARK-11625][SQL] add java test for typed aggregate

Author: Wenchen Fan 

Closes #9591 from cloud-fan/agg-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd14936b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd14936b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd14936b

Branch: refs/heads/master
Commit: fd14936be7beff543dbbcf270f2f9749f7a803c4
Parents: 75ee12f
Author: Wenchen Fan 
Authored: Mon Nov 16 15:32:49 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 15:32:49 2015 -0800

--
 .../spark/api/java/function/Function.java   |  2 +-
 .../org/apache/spark/sql/GroupedDataset.scala   | 34 ++--
 .../spark/sql/expressions/Aggregator.scala  |  2 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 56 
 .../spark/sql/DatasetAggregatorSuite.scala  |  7 ++-
 5 files changed, 92 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fd14936b/core/src/main/java/org/apache/spark/api/java/function/Function.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function.java 
b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index d00551b..b9d9777 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -25,5 +25,5 @@ import java.io.Serializable;
  * when mapping RDDs of other types.
  */
 public interface Function extends Serializable {
-  public R call(T1 v1) throws Exception;
+  R call(T1 v1) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fd14936b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index ebcf4c8..467cd42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -145,9 +145,37 @@ class GroupedDataset[K, T] private[sql](
 reduce(f.call _)
   }
 
-  // To ensure valid overloading.
-  protected def agg(expr: Column, exprs: Column*): DataFrame =
-groupedData.agg(expr, exprs: _*)
+  /**
+   * Compute aggregates by specifying a series of aggregate columns, and 
return a [[DataFrame]].
+   * We can call `as[T : Encoder]` to turn the returned [[DataFrame]] to 
[[Dataset]] again.
+   *
+   * The available aggregate methods are defined in 
[[org.apache.spark.sql.functions]].
+   *
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *
+   *   // Scala:
+   *   import org.apache.spark.sql.functions._
+   *   df.groupBy("department").agg(max("age"), sum("expense"))
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.functions.*;
+   *   df.groupBy("department").agg(max("age"), sum("expense"));
+   * }}}
+   *
+   * We can also use `Aggregator.toColumn` to pass in typed aggregate 
functions.
+   *
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DataFrame =
+groupedData.agg(withEncoder(expr), exprs.map(withEncoder): _*)
+
+  private def withEncoder(c: Column): Column = c match {
+case tc: TypedColumn[_, _] =>
+  tc.withInputType(resolvedTEncoder.bind(dataAttributes), dataAttributes)
+case _ => c
+  }
 
   /**
* Internal helper function for building typed aggregations that return 
tuples.  For simplicity

http://git-wip-us.apache.org/repos/asf/spark/blob/fd14936b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
index 360c9a5..72610e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn}
  * @tparam B The type of the intermediate value of the reduction.
  * @tparam C The type of the final result.
  */
-abstract class Aggregator[-A, B, C] {
+abstract class Aggregator[-A, B, C] extends Serializable {
 
   /** A zero value for this aggregation. Should satisfy the property that any 
b + zero = b */
  

spark git commit: [SPARK-11742][STREAMING] Add the failure info to the batch lists

2015-11-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 3c025087b -> bcea0bfda


[SPARK-11742][STREAMING] Add the failure info to the batch lists

https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png;>

Author: Shixiong Zhu 

Closes #9711 from zsxwing/failure-info.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcea0bfd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcea0bfd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcea0bfd

Branch: refs/heads/master
Commit: bcea0bfda66a30ee86790b048de5cb47b4d0b32f
Parents: 3c02508
Author: Shixiong Zhu 
Authored: Mon Nov 16 15:06:06 2015 -0800
Committer: Tathagata Das 
Committed: Mon Nov 16 15:06:06 2015 -0800

--
 .../spark/streaming/ui/AllBatchesTable.scala| 61 ++--
 .../apache/spark/streaming/ui/BatchPage.scala   | 49 ++--
 .../org/apache/spark/streaming/ui/UIUtils.scala | 60 +++
 3 files changed, 120 insertions(+), 50 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bcea0bfd/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 125cafd..d339723 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, 
batchInterval: Long)
 {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", 
"top")}
   }
 
+  /**
+   * Return the first failure reason if finding in the batches.
+   */
+  protected def getFirstFailureReason(batches: Seq[BatchUIData]): 
Option[String] = {
+batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
+  }
+
+  protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+val firstFailureReason = 
batch.outputOperations.flatMap(_._2.failureReason).headOption
+firstFailureReason.map { failureReason =>
+  val failureReasonForUI = 
UIUtils.createOutputOperationFailureForUI(failureReason)
+  UIUtils.failureReasonCell(
+failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = 
false)
+}.getOrElse(-)
+  }
+
   protected def baseRow(batch: BatchUIData): Seq[Node] = {
 val batchTime = batch.batchTime.milliseconds
 val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
@@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable(
 waitingBatches: Seq[BatchUIData],
 batchInterval: Long) extends BatchTableBase("active-batches-table", 
batchInterval) {
 
+  private val firstFailureReason = getFirstFailureReason(runningBatches)
+
   override protected def columns: Seq[Node] = super.columns ++ {
 Output Ops: Succeeded/Total
-  Status
+  Status ++ {
+  if (firstFailureReason.nonEmpty) {
+Error
+  } else {
+Nil
+  }
+}
   }
 
   override protected def renderRows: Seq[Node] = {
@@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable(
   }
 
   private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
-baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
processing
+baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
processing ++ {
+  if (firstFailureReason.nonEmpty) {
+getFirstFailureTableCell(batch)
+  } else {
+Nil
+  }
+}
   }
 
   private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
-baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
queued
+baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
queued++ {
+  if (firstFailureReason.nonEmpty) {
+// Waiting batches have not run yet, so must have no failure reasons.
+-
+  } else {
+Nil
+  }
+}
   }
 }
 
 private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], 
batchInterval: Long)
   extends BatchTableBase("completed-batches-table", batchInterval) {
 
+  private val firstFailureReason = getFirstFailureReason(batches)
+
   override protected def columns: Seq[Node] = super.columns ++ {
 Total Delay {SparkUIUtils.tooltip("Total time taken to handle a 
batch", "top")}
-  Output Ops: Succeeded/Total
+  Output Ops: Succeeded/Total ++ {
+  if (firstFailureReason.nonEmpty) {
+Error
+  } else {
+Nil
+  }
+}
   }
 
   override protected def renderRows: Seq[Node] = {
@@ -138,6 +183,12 @@ private[ui] 

spark git commit: [SPARK-11742][STREAMING] Add the failure info to the batch lists

2015-11-16 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 64439f7d6 -> 3bd72eafc


[SPARK-11742][STREAMING] Add the failure info to the batch lists

https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png;>

Author: Shixiong Zhu 

Closes #9711 from zsxwing/failure-info.

(cherry picked from commit bcea0bfda66a30ee86790b048de5cb47b4d0b32f)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bd72eaf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bd72eaf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bd72eaf

Branch: refs/heads/branch-1.6
Commit: 3bd72eafc5b7204d49fc1a3d587337fae88d8ae8
Parents: 64439f7
Author: Shixiong Zhu 
Authored: Mon Nov 16 15:06:06 2015 -0800
Committer: Tathagata Das 
Committed: Mon Nov 16 15:06:20 2015 -0800

--
 .../spark/streaming/ui/AllBatchesTable.scala| 61 ++--
 .../apache/spark/streaming/ui/BatchPage.scala   | 49 ++--
 .../org/apache/spark/streaming/ui/UIUtils.scala | 60 +++
 3 files changed, 120 insertions(+), 50 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3bd72eaf/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index 125cafd..d339723 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, 
batchInterval: Long)
 {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", 
"top")}
   }
 
+  /**
+   * Return the first failure reason if finding in the batches.
+   */
+  protected def getFirstFailureReason(batches: Seq[BatchUIData]): 
Option[String] = {
+batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
+  }
+
+  protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+val firstFailureReason = 
batch.outputOperations.flatMap(_._2.failureReason).headOption
+firstFailureReason.map { failureReason =>
+  val failureReasonForUI = 
UIUtils.createOutputOperationFailureForUI(failureReason)
+  UIUtils.failureReasonCell(
+failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = 
false)
+}.getOrElse(-)
+  }
+
   protected def baseRow(batch: BatchUIData): Seq[Node] = {
 val batchTime = batch.batchTime.milliseconds
 val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
@@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable(
 waitingBatches: Seq[BatchUIData],
 batchInterval: Long) extends BatchTableBase("active-batches-table", 
batchInterval) {
 
+  private val firstFailureReason = getFirstFailureReason(runningBatches)
+
   override protected def columns: Seq[Node] = super.columns ++ {
 Output Ops: Succeeded/Total
-  Status
+  Status ++ {
+  if (firstFailureReason.nonEmpty) {
+Error
+  } else {
+Nil
+  }
+}
   }
 
   override protected def renderRows: Seq[Node] = {
@@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable(
   }
 
   private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
-baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
processing
+baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
processing ++ {
+  if (firstFailureReason.nonEmpty) {
+getFirstFailureTableCell(batch)
+  } else {
+Nil
+  }
+}
   }
 
   private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
-baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
queued
+baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ 
queued++ {
+  if (firstFailureReason.nonEmpty) {
+// Waiting batches have not run yet, so must have no failure reasons.
+-
+  } else {
+Nil
+  }
+}
   }
 }
 
 private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], 
batchInterval: Long)
   extends BatchTableBase("completed-batches-table", batchInterval) {
 
+  private val firstFailureReason = getFirstFailureReason(batches)
+
   override protected def columns: Seq[Node] = super.columns ++ {
 Total Delay {SparkUIUtils.tooltip("Total time taken to handle a 
batch", "top")}
-  Output Ops: Succeeded/Total
+  Output Ops: Succeeded/Total ++ {
+  if (firstFailureReason.nonEmpty) {
+Error
+ 

spark git commit: [SPARK-11553][SQL] Primitive Row accessors should not convert null to default value

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 3bd72eafc -> 6c8e0c0ff


[SPARK-11553][SQL] Primitive Row accessors should not convert null to default 
value

Invocation of getters for type extending AnyVal returns default value (if field 
value is null) instead of throwing NPE. Please check comments for SPARK-11553 
issue for more details.

Author: Bartlomiej Alberski 

Closes #9642 from alberskib/bugfix/SPARK-11553.

(cherry picked from commit 31296628ac7cd7be71e0edca335dc8604f62bb47)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c8e0c0f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c8e0c0f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c8e0c0f

Branch: refs/heads/branch-1.6
Commit: 6c8e0c0ff4fd3acbe0dbe7119920ee8b74d191dd
Parents: 3bd72ea
Author: Bartlomiej Alberski 
Authored: Mon Nov 16 15:14:38 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 15:14:48 2015 -0800

--
 .../main/scala/org/apache/spark/sql/Row.scala   | 32 -
 .../scala/org/apache/spark/sql/RowTest.scala| 20 +++
 .../local/NestedLoopJoinNodeSuite.scala | 36 
 3 files changed, 65 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6c8e0c0f/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 0f0f200..b14c66c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -191,7 +191,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getBoolean(i: Int): Boolean = getAs[Boolean](i)
+  def getBoolean(i: Int): Boolean = getAnyValAs[Boolean](i)
 
   /**
* Returns the value at position i as a primitive byte.
@@ -199,7 +199,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getByte(i: Int): Byte = getAs[Byte](i)
+  def getByte(i: Int): Byte = getAnyValAs[Byte](i)
 
   /**
* Returns the value at position i as a primitive short.
@@ -207,7 +207,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getShort(i: Int): Short = getAs[Short](i)
+  def getShort(i: Int): Short = getAnyValAs[Short](i)
 
   /**
* Returns the value at position i as a primitive int.
@@ -215,7 +215,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getInt(i: Int): Int = getAs[Int](i)
+  def getInt(i: Int): Int = getAnyValAs[Int](i)
 
   /**
* Returns the value at position i as a primitive long.
@@ -223,7 +223,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getLong(i: Int): Long = getAs[Long](i)
+  def getLong(i: Int): Long = getAnyValAs[Long](i)
 
   /**
* Returns the value at position i as a primitive float.
@@ -232,7 +232,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getFloat(i: Int): Float = getAs[Float](i)
+  def getFloat(i: Int): Float = getAnyValAs[Float](i)
 
   /**
* Returns the value at position i as a primitive double.
@@ -240,13 +240,12 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getDouble(i: Int): Double = getAs[Double](i)
+  def getDouble(i: Int): Double = getAnyValAs[Double](i)
 
   /**
* Returns the value at position i as a String object.
*
* @throws ClassCastException when data type does not match.
-   * @throws NullPointerException when value is null.
*/
   def getString(i: Int): String = getAs[String](i)
 
@@ -318,6 +317,8 @@ trait Row extends Serializable {
 
   /**
* Returns the value at position i.
+   * For primitive types if value is null it returns 'zero value' specific for 
primitive
+   * ie. 0 for Int - use isNullAt to ensure that value is not null
*
* @throws ClassCastException when 

spark git commit: [SPARKR][HOTFIX] Disable flaky SparkR package build test

2015-11-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4f8c7e18f -> bb044ec22


[SPARKR][HOTFIX] Disable flaky SparkR package build test

See https://github.com/apache/spark/pull/9390#issuecomment-157160063 and 
https://gist.github.com/shivaram/3a2fecce60768a603dac for more information

Author: Shivaram Venkataraman 

Closes #9744 from shivaram/sparkr-package-test-disable.

(cherry picked from commit ea6f53e48a911b49dc175ccaac8c80e0a1d97a09)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb044ec2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb044ec2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb044ec2

Branch: refs/heads/branch-1.6
Commit: bb044ec2278e5623e8fd3ae1f3172b5047b83439
Parents: 4f8c7e1
Author: Shivaram Venkataraman 
Authored: Mon Nov 16 16:57:50 2015 -0800
Committer: Andrew Or 
Committed: Mon Nov 16 16:58:02 2015 -0800

--
 .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala| 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bb044ec2/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 42e748e..d494b0c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -369,7 +369,9 @@ class SparkSubmitSuite
 }
   }
 
-  test("correctly builds R packages included in a jar with --packages") {
+  // TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds.
+  // See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log
+  ignore("correctly builds R packages included in a jar with --packages") {
 assume(RUtils.isRInstalled, "R isn't installed on this machine.")
 val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
 val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARKR][HOTFIX] Disable flaky SparkR package build test

2015-11-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master fd14936be -> ea6f53e48


[SPARKR][HOTFIX] Disable flaky SparkR package build test

See https://github.com/apache/spark/pull/9390#issuecomment-157160063 and 
https://gist.github.com/shivaram/3a2fecce60768a603dac for more information

Author: Shivaram Venkataraman 

Closes #9744 from shivaram/sparkr-package-test-disable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea6f53e4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea6f53e4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea6f53e4

Branch: refs/heads/master
Commit: ea6f53e48a911b49dc175ccaac8c80e0a1d97a09
Parents: fd14936
Author: Shivaram Venkataraman 
Authored: Mon Nov 16 16:57:50 2015 -0800
Committer: Andrew Or 
Committed: Mon Nov 16 16:57:50 2015 -0800

--
 .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala| 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ea6f53e4/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 42e748e..d494b0c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -369,7 +369,9 @@ class SparkSubmitSuite
 }
   }
 
-  test("correctly builds R packages included in a jar with --packages") {
+  // TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds.
+  // See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log
+  ignore("correctly builds R packages included in a jar with --packages") {
 assume(RUtils.isRInstalled, "R isn't installed on this machine.")
 val main = MavenCoordinate("my.great.lib", "mylib", "0.1")
 val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-8658][SQL] AttributeReference's equals method compares all the members

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6c8e0c0ff -> e042780cd


[SPARK-8658][SQL] AttributeReference's equals method compares all the members

This fix is to change the equals method to check all of the specified fields 
for equality of AttributeReference.

Author: gatorsmile 

Closes #9216 from gatorsmile/namedExpressEqual.

(cherry picked from commit 75ee12f09c2645c1ad682764d512965f641eb5c2)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e042780c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e042780c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e042780c

Branch: refs/heads/branch-1.6
Commit: e042780cd95f5bf3988dd73bc4431c3b012c171c
Parents: 6c8e0c0
Author: gatorsmile 
Authored: Mon Nov 16 15:22:12 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 15:22:24 2015 -0800

--
 .../sql/catalyst/expressions/namedExpressions.scala |  4 +++-
 .../sql/catalyst/plans/logical/basicOperators.scala | 10 +-
 .../sql/catalyst/plans/physical/partitioning.scala  | 12 ++--
 3 files changed, 14 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e042780c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index f80bcfc..e3dadda 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -194,7 +194,9 @@ case class AttributeReference(
   def sameRef(other: AttributeReference): Boolean = this.exprId == other.exprId
 
   override def equals(other: Any): Boolean = other match {
-case ar: AttributeReference => name == ar.name && exprId == ar.exprId && 
dataType == ar.dataType
+case ar: AttributeReference =>
+  name == ar.name && dataType == ar.dataType && nullable == ar.nullable &&
+metadata == ar.metadata && exprId == ar.exprId && qualifiers == 
ar.qualifiers
 case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e042780c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index e2b97b2..45630a5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.OpenHashSet
+import scala.collection.mutable.ArrayBuffer
 
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) 
extends UnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
@@ -244,12 +244,12 @@ private[sql] object Expand {
*/
   private def buildNonSelectExprSet(
   bitmask: Int,
-  exprs: Seq[Expression]): OpenHashSet[Expression] = {
-val set = new OpenHashSet[Expression](2)
+  exprs: Seq[Expression]): ArrayBuffer[Expression] = {
+val set = new ArrayBuffer[Expression](2)
 
 var bit = exprs.length - 1
 while (bit >= 0) {
-  if (((bitmask >> bit) & 1) == 0) set.add(exprs(bit))
+  if (((bitmask >> bit) & 1) == 0) set += exprs(bit)
   bit -= 1
 }
 
@@ -279,7 +279,7 @@ private[sql] object Expand {
 
   (child.output :+ gid).map(expr => expr transformDown {
 // TODO this causes a problem when a column is used both for grouping 
and aggregation.
-case x: Expression if nonSelectedGroupExprSet.contains(x) =>
+case x: Expression if 
nonSelectedGroupExprSet.exists(_.semanticEquals(x)) =>
   // if the input attribute in the Invalid Grouping Expression set of 
for this group
   // replace it with constant null
   Literal.create(null, expr.dataType)


spark git commit: [SPARK-8658][SQL] AttributeReference's equals method compares all the members

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 31296628a -> 75ee12f09


[SPARK-8658][SQL] AttributeReference's equals method compares all the members

This fix is to change the equals method to check all of the specified fields 
for equality of AttributeReference.

Author: gatorsmile 

Closes #9216 from gatorsmile/namedExpressEqual.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75ee12f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75ee12f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75ee12f0

Branch: refs/heads/master
Commit: 75ee12f09c2645c1ad682764d512965f641eb5c2
Parents: 3129662
Author: gatorsmile 
Authored: Mon Nov 16 15:22:12 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 15:22:12 2015 -0800

--
 .../sql/catalyst/expressions/namedExpressions.scala |  4 +++-
 .../sql/catalyst/plans/logical/basicOperators.scala | 10 +-
 .../sql/catalyst/plans/physical/partitioning.scala  | 12 ++--
 3 files changed, 14 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/75ee12f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index f80bcfc..e3dadda 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -194,7 +194,9 @@ case class AttributeReference(
   def sameRef(other: AttributeReference): Boolean = this.exprId == other.exprId
 
   override def equals(other: Any): Boolean = other match {
-case ar: AttributeReference => name == ar.name && exprId == ar.exprId && 
dataType == ar.dataType
+case ar: AttributeReference =>
+  name == ar.name && dataType == ar.dataType && nullable == ar.nullable &&
+metadata == ar.metadata && exprId == ar.exprId && qualifiers == 
ar.qualifiers
 case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/75ee12f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index e2b97b2..45630a5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.collection.OpenHashSet
+import scala.collection.mutable.ArrayBuffer
 
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) 
extends UnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
@@ -244,12 +244,12 @@ private[sql] object Expand {
*/
   private def buildNonSelectExprSet(
   bitmask: Int,
-  exprs: Seq[Expression]): OpenHashSet[Expression] = {
-val set = new OpenHashSet[Expression](2)
+  exprs: Seq[Expression]): ArrayBuffer[Expression] = {
+val set = new ArrayBuffer[Expression](2)
 
 var bit = exprs.length - 1
 while (bit >= 0) {
-  if (((bitmask >> bit) & 1) == 0) set.add(exprs(bit))
+  if (((bitmask >> bit) & 1) == 0) set += exprs(bit)
   bit -= 1
 }
 
@@ -279,7 +279,7 @@ private[sql] object Expand {
 
   (child.output :+ gid).map(expr => expr transformDown {
 // TODO this causes a problem when a column is used both for grouping 
and aggregation.
-case x: Expression if nonSelectedGroupExprSet.contains(x) =>
+case x: Expression if 
nonSelectedGroupExprSet.exists(_.semanticEquals(x)) =>
   // if the input attribute in the Invalid Grouping Expression set of 
for this group
   // replace it with constant null
   Literal.create(null, expr.dataType)

http://git-wip-us.apache.org/repos/asf/spark/blob/75ee12f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
--
diff --git 

spark git commit: [SPARK-11625][SQL] add java test for typed aggregate

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e042780cd -> 4f8c7e18f


[SPARK-11625][SQL] add java test for typed aggregate

Author: Wenchen Fan 

Closes #9591 from cloud-fan/agg-test.

(cherry picked from commit fd14936be7beff543dbbcf270f2f9749f7a803c4)
Signed-off-by: Michael Armbrust 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8c7e18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8c7e18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8c7e18

Branch: refs/heads/branch-1.6
Commit: 4f8c7e18f3103ee1fcf5a79c1d39cf5a81e78c87
Parents: e042780
Author: Wenchen Fan 
Authored: Mon Nov 16 15:32:49 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 15:35:21 2015 -0800

--
 .../spark/api/java/function/Function.java   |  2 +-
 .../org/apache/spark/sql/GroupedDataset.scala   | 34 ++--
 .../spark/sql/expressions/Aggregator.scala  |  2 +-
 .../org/apache/spark/sql/JavaDatasetSuite.java  | 56 
 .../spark/sql/DatasetAggregatorSuite.scala  |  7 ++-
 5 files changed, 92 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f8c7e18/core/src/main/java/org/apache/spark/api/java/function/Function.java
--
diff --git 
a/core/src/main/java/org/apache/spark/api/java/function/Function.java 
b/core/src/main/java/org/apache/spark/api/java/function/Function.java
index d00551b..b9d9777 100644
--- a/core/src/main/java/org/apache/spark/api/java/function/Function.java
+++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java
@@ -25,5 +25,5 @@ import java.io.Serializable;
  * when mapping RDDs of other types.
  */
 public interface Function extends Serializable {
-  public R call(T1 v1) throws Exception;
+  R call(T1 v1) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4f8c7e18/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
index ebcf4c8..467cd42 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala
@@ -145,9 +145,37 @@ class GroupedDataset[K, T] private[sql](
 reduce(f.call _)
   }
 
-  // To ensure valid overloading.
-  protected def agg(expr: Column, exprs: Column*): DataFrame =
-groupedData.agg(expr, exprs: _*)
+  /**
+   * Compute aggregates by specifying a series of aggregate columns, and 
return a [[DataFrame]].
+   * We can call `as[T : Encoder]` to turn the returned [[DataFrame]] to 
[[Dataset]] again.
+   *
+   * The available aggregate methods are defined in 
[[org.apache.spark.sql.functions]].
+   *
+   * {{{
+   *   // Selects the age of the oldest employee and the aggregate expense for 
each department
+   *
+   *   // Scala:
+   *   import org.apache.spark.sql.functions._
+   *   df.groupBy("department").agg(max("age"), sum("expense"))
+   *
+   *   // Java:
+   *   import static org.apache.spark.sql.functions.*;
+   *   df.groupBy("department").agg(max("age"), sum("expense"));
+   * }}}
+   *
+   * We can also use `Aggregator.toColumn` to pass in typed aggregate 
functions.
+   *
+   * @since 1.6.0
+   */
+  @scala.annotation.varargs
+  def agg(expr: Column, exprs: Column*): DataFrame =
+groupedData.agg(withEncoder(expr), exprs.map(withEncoder): _*)
+
+  private def withEncoder(c: Column): Column = c match {
+case tc: TypedColumn[_, _] =>
+  tc.withInputType(resolvedTEncoder.bind(dataAttributes), dataAttributes)
+case _ => c
+  }
 
   /**
* Internal helper function for building typed aggregations that return 
tuples.  For simplicity

http://git-wip-us.apache.org/repos/asf/spark/blob/4f8c7e18/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
index 360c9a5..72610e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala
@@ -47,7 +47,7 @@ import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn}
  * @tparam B The type of the intermediate value of the reduction.
  * @tparam C The type of the final result.
  */
-abstract class Aggregator[-A, B, C] {
+abstract class 

spark git commit: [SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync

2015-11-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 bb044ec22 -> e4abfe932


[SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using 
AsyncRDDActions#takeAsync

When we call AsyncRDDActions#takeAsync, actually another DAGScheduler#runJob is 
called from another thread so we cannot get proper callsite infomation.

Following screenshots are before this patch applied and after.

Before:
https://cloud.githubusercontent.com/assets/4736016/10914069/0ffc1306-8294-11e5-8e89-c4fadf58dd12.png;>
https://cloud.githubusercontent.com/assets/4736016/10914070/0ffe84ce-8294-11e5-8b2a-69d36276bedb.png;>

After:
https://cloud.githubusercontent.com/assets/4736016/10914080/1d8cfb7a-8294-11e5-9e09-ede25c2563e8.png;>
https://cloud.githubusercontent.com/assets/4736016/10914081/1d934e3a-8294-11e5-8b5e-e3dc37aaced3.png;>

Author: Kousuke Saruta 

Closes #9437 from sarutak/SPARK-11480.

(cherry picked from commit 30f3cfda1cce8760f15c0a48a8c47f09a5167fca)
Signed-off-by: Andrew Or 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4abfe93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4abfe93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4abfe93

Branch: refs/heads/branch-1.6
Commit: e4abfe9324e3fdebace8b33cc6616f382ad7ac45
Parents: bb044ec
Author: Kousuke Saruta 
Authored: Mon Nov 16 16:59:16 2015 -0800
Committer: Andrew Or 
Committed: Mon Nov 16 16:59:23 2015 -0800

--
 core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e4abfe93/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ca1eb1f..d5e8536 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -66,6 +66,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
*/
   def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
 val f = new ComplexFutureAction[Seq[T]]
+val callSite = self.context.getCallSite
 
 f.run {
   // This is a blocking action so we should use 
"AsyncRDDActions.futureExecutionContext" which
@@ -73,6 +74,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
   val results = new ArrayBuffer[T](num)
   val totalParts = self.partitions.length
   var partsScanned = 0
+  self.context.setCallSite(callSite)
   while (results.size < num && partsScanned < totalParts) {
 // The number of partitions to try in this iteration. It is ok for 
this number to be
 // greater than totalParts because we actually cap it at totalParts in 
runJob.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync

2015-11-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master ea6f53e48 -> 30f3cfda1


[SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using 
AsyncRDDActions#takeAsync

When we call AsyncRDDActions#takeAsync, actually another DAGScheduler#runJob is 
called from another thread so we cannot get proper callsite infomation.

Following screenshots are before this patch applied and after.

Before:
https://cloud.githubusercontent.com/assets/4736016/10914069/0ffc1306-8294-11e5-8e89-c4fadf58dd12.png;>
https://cloud.githubusercontent.com/assets/4736016/10914070/0ffe84ce-8294-11e5-8b2a-69d36276bedb.png;>

After:
https://cloud.githubusercontent.com/assets/4736016/10914080/1d8cfb7a-8294-11e5-9e09-ede25c2563e8.png;>
https://cloud.githubusercontent.com/assets/4736016/10914081/1d934e3a-8294-11e5-8b5e-e3dc37aaced3.png;>

Author: Kousuke Saruta 

Closes #9437 from sarutak/SPARK-11480.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30f3cfda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30f3cfda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30f3cfda

Branch: refs/heads/master
Commit: 30f3cfda1cce8760f15c0a48a8c47f09a5167fca
Parents: ea6f53e
Author: Kousuke Saruta 
Authored: Mon Nov 16 16:59:16 2015 -0800
Committer: Andrew Or 
Committed: Mon Nov 16 16:59:16 2015 -0800

--
 core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 ++
 1 file changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/30f3cfda/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index ca1eb1f..d5e8536 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -66,6 +66,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
*/
   def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
 val f = new ComplexFutureAction[Seq[T]]
+val callSite = self.context.getCallSite
 
 f.run {
   // This is a blocking action so we should use 
"AsyncRDDActions.futureExecutionContext" which
@@ -73,6 +74,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
   val results = new ArrayBuffer[T](num)
   val totalParts = self.partitions.length
   var partsScanned = 0
+  self.context.setCallSite(callSite)
   while (results.size < num && partsScanned < totalParts) {
 // The number of partitions to try in this iteration. It is ok for 
this number to be
 // greater than totalParts because we actually cap it at totalParts in 
runJob.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11710] Document new memory management model

2015-11-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 30f3cfda1 -> 33a0ec937


[SPARK-11710] Document new memory management model

Author: Andrew Or 

Closes #9676 from andrewor14/memory-management-docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33a0ec93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33a0ec93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33a0ec93

Branch: refs/heads/master
Commit: 33a0ec93771ef5c3b388165b07cfab9014918d3b
Parents: 30f3cfd
Author: Andrew Or 
Authored: Mon Nov 16 17:00:18 2015 -0800
Committer: Andrew Or 
Committed: Mon Nov 16 17:00:18 2015 -0800

--
 docs/configuration.md | 13 ++-
 docs/tuning.md| 54 ++
 2 files changed, 44 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/33a0ec93/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index d961f43..c496146 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -722,17 +722,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Fraction of the heap space used for execution and storage. The lower this 
is, the more
 frequently spills and cached data eviction occur. The purpose of this 
config is to set
 aside memory for internal metadata, user data structures, and imprecise 
size estimation
-in the case of sparse, unusually large records.
+in the case of sparse, unusually large records. Leaving this at the 
default value is
+recommended. For more detail, see 
+this description.
   
 
 
   spark.memory.storageFraction
   0.5
   
-T​he size of the storage region within the space set aside by
-s​park.memory.fraction. This region is not statically 
reserved, but dynamically
-allocated as cache requests come in. ​Cached data may be evicted only if 
total storage exceeds
-this region.
+Amount of storage memory immune to eviction, expressed as a fraction of 
the size of the
+region set aside by s​park.memory.fraction. The higher this 
is, the less
+working memory may be available to execution and tasks may spill to disk 
more often.
+Leaving this at the default value is recommended. For more detail, see
+this description.
   
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/33a0ec93/docs/tuning.md
--
diff --git a/docs/tuning.md b/docs/tuning.md
index 879340a..a8fe7a4 100644
--- a/docs/tuning.md
+++ b/docs/tuning.md
@@ -88,9 +88,39 @@ than the "raw" data inside their fields. This is due to 
several reasons:
   but also pointers (typically 8 bytes each) to the next object in the list.
 * Collections of primitive types often store them as "boxed" objects such as 
`java.lang.Integer`.
 
-This section will discuss how to determine the memory usage of your objects, 
and how to improve
-it -- either by changing your data structures, or by storing data in a 
serialized format.
-We will then cover tuning Spark's cache size and the Java garbage collector.
+This section will start with an overview of memory management in Spark, then 
discuss specific
+strategies the user can take to make more efficient use of memory in his/her 
application. In
+particular, we will describe how to determine the memory usage of your 
objects, and how to
+improve it -- either by changing your data structures, or by storing data in a 
serialized
+format. We will then cover tuning Spark's cache size and the Java garbage 
collector.
+
+## Memory Management Overview
+
+Memory usage in Spark largely falls under one of two categories: execution and 
storage.
+Execution memory refers to that used for computation in shuffles, joins, sorts 
and aggregations,
+while storage memory refers to that used for caching and propagating internal 
data across the
+cluster. In Spark, execution and storage share a unified region (M). When no 
execution memory is
+used, storage can acquire all the available memory and vice versa. Execution 
may evict storage
+if necessary, but only until total storage memory usage falls under a certain 
threshold (R).
+In other words, `R` describes a subregion within `M` where cached blocks are 
never evicted.
+Storage may not evict execution due to complexities in implementation.
+
+This design ensures several desirable properties. First, applications that do 
not use caching
+can use the entire space for execution, obviating unnecessary disk spills. 
Second, applications
+that do use caching can reserve a minimum storage space (R) where their data 
blocks are immune
+to 

spark git commit: [EXAMPLE][MINOR] Add missing awaitTermination in click stream example

2015-11-16 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/master 33a0ec937 -> bd10eb81c


[EXAMPLE][MINOR] Add missing awaitTermination in click stream example

Author: jerryshao 

Closes #9730 from jerryshao/clickstream-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd10eb81
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd10eb81
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd10eb81

Branch: refs/heads/master
Commit: bd10eb81c98e5e9df453f721943a3e82d9f74ae4
Parents: 33a0ec9
Author: jerryshao 
Authored: Mon Nov 16 17:02:21 2015 -0800
Committer: Andrew Or 
Committed: Mon Nov 16 17:02:21 2015 -0800

--
 .../spark/examples/streaming/clickstream/PageViewStream.scala| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bd10eb81/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
--
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
index ec7d39d..4ef2386 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
@@ -18,7 +18,6 @@
 // scalastyle:off println
 package org.apache.spark.examples.streaming.clickstream
 
-import org.apache.spark.SparkContext._
 import org.apache.spark.streaming.{Seconds, StreamingContext}
 import org.apache.spark.examples.streaming.StreamingExamples
 // scalastyle:off
@@ -88,7 +87,7 @@ object PageViewStream {
 
 // An external dataset we want to join to this stream
 val userList = ssc.sparkContext.parallelize(
-   Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq)
+   Map(1 -> "Patrick Wendell", 2 -> "Reynold Xin", 3 -> "Matei 
Zaharia").toSeq)
 
 metric match {
   case "pageCounts" => pageCounts.print()
@@ -106,6 +105,7 @@ object PageViewStream {
 }
 
 ssc.start()
+ssc.awaitTermination()
   }
 }
 // scalastyle:on println


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for current_timestamp).

2015-11-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 540bf58f1 -> fbad920db


[SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for 
current_timestamp).

This patch adds an alias for current_timestamp (now function).

Also fixes SPARK-9196 to re-enable the test case for current_timestamp.

Author: Reynold Xin 

Closes #9753 from rxin/SPARK-11768.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbad920d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbad920d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbad920d

Branch: refs/heads/master
Commit: fbad920dbfd6f389dea852cdc159cacb0f4f6997
Parents: 540bf58
Author: Reynold Xin 
Authored: Mon Nov 16 20:47:46 2015 -0800
Committer: Reynold Xin 
Committed: Mon Nov 16 20:47:46 2015 -0800

--
 .../sql/catalyst/analysis/FunctionRegistry.scala  |  1 +
 .../org/apache/spark/sql/DateFunctionsSuite.scala | 18 --
 2 files changed, 13 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fbad920d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index a8f4d25..f9c04d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -244,6 +244,7 @@ object FunctionRegistry {
 expression[AddMonths]("add_months"),
 expression[CurrentDate]("current_date"),
 expression[CurrentTimestamp]("current_timestamp"),
+expression[CurrentTimestamp]("now"),
 expression[DateDiff]("datediff"),
 expression[DateAdd]("date_add"),
 expression[DateFormatClass]("date_format"),

http://git-wip-us.apache.org/repos/asf/spark/blob/fbad920d/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 1266d53..241cbd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -38,15 +38,21 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
 assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
   }
 
-  // This is a bad test. SPARK-9196 will fix it and re-enable it.
-  ignore("function current_timestamp") {
+  test("function current_timestamp and now") {
 val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
 checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1))
+
 // Execution in one query should return the same value
-checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""),
-  Row(true))
-assert(math.abs(sql("""SELECT 
CURRENT_TIMESTAMP()""").collect().head.getTimestamp(
-  0).getTime - System.currentTimeMillis()) < 5000)
+checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), 
Row(true))
+
+// Current timestamp should return the current timestamp ...
+val before = System.currentTimeMillis
+val got = sql("SELECT 
CURRENT_TIMESTAMP()").collect().head.getTimestamp(0).getTime
+val after = System.currentTimeMillis
+assert(got >= before && got <= after)
+
+// Now alias
+checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = NOW()"""), Row(true))
   }
 
   val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for current_timestamp).

2015-11-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 5a6f40459 -> 07ac8e950


[SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for 
current_timestamp).

This patch adds an alias for current_timestamp (now function).

Also fixes SPARK-9196 to re-enable the test case for current_timestamp.

Author: Reynold Xin 

Closes #9753 from rxin/SPARK-11768.

(cherry picked from commit fbad920dbfd6f389dea852cdc159cacb0f4f6997)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07ac8e95
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07ac8e95
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07ac8e95

Branch: refs/heads/branch-1.6
Commit: 07ac8e95070ef1cb22641de9159479294a6cd774
Parents: 5a6f404
Author: Reynold Xin 
Authored: Mon Nov 16 20:47:46 2015 -0800
Committer: Reynold Xin 
Committed: Mon Nov 16 20:47:59 2015 -0800

--
 .../sql/catalyst/analysis/FunctionRegistry.scala  |  1 +
 .../org/apache/spark/sql/DateFunctionsSuite.scala | 18 --
 2 files changed, 13 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07ac8e95/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index a8f4d25..f9c04d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -244,6 +244,7 @@ object FunctionRegistry {
 expression[AddMonths]("add_months"),
 expression[CurrentDate]("current_date"),
 expression[CurrentTimestamp]("current_timestamp"),
+expression[CurrentTimestamp]("now"),
 expression[DateDiff]("datediff"),
 expression[DateAdd]("date_add"),
 expression[DateFormatClass]("date_format"),

http://git-wip-us.apache.org/repos/asf/spark/blob/07ac8e95/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 1266d53..241cbd0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -38,15 +38,21 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
 assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1)
   }
 
-  // This is a bad test. SPARK-9196 will fix it and re-enable it.
-  ignore("function current_timestamp") {
+  test("function current_timestamp and now") {
 val df1 = Seq((1, 2), (3, 1)).toDF("a", "b")
 checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1))
+
 // Execution in one query should return the same value
-checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""),
-  Row(true))
-assert(math.abs(sql("""SELECT 
CURRENT_TIMESTAMP()""").collect().head.getTimestamp(
-  0).getTime - System.currentTimeMillis()) < 5000)
+checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), 
Row(true))
+
+// Current timestamp should return the current timestamp ...
+val before = System.currentTimeMillis
+val got = sql("SELECT 
CURRENT_TIMESTAMP()").collect().head.getTimestamp(0).getTime
+val after = System.currentTimeMillis
+assert(got >= before && got <= after)
+
+// Now alias
+checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = NOW()"""), Row(true))
   }
 
   val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11553][SQL] Primitive Row accessors should not convert null to default value

2015-11-16 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master bcea0bfda -> 31296628a


[SPARK-11553][SQL] Primitive Row accessors should not convert null to default 
value

Invocation of getters for type extending AnyVal returns default value (if field 
value is null) instead of throwing NPE. Please check comments for SPARK-11553 
issue for more details.

Author: Bartlomiej Alberski 

Closes #9642 from alberskib/bugfix/SPARK-11553.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31296628
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31296628
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31296628

Branch: refs/heads/master
Commit: 31296628ac7cd7be71e0edca335dc8604f62bb47
Parents: bcea0bf
Author: Bartlomiej Alberski 
Authored: Mon Nov 16 15:14:38 2015 -0800
Committer: Michael Armbrust 
Committed: Mon Nov 16 15:14:38 2015 -0800

--
 .../main/scala/org/apache/spark/sql/Row.scala   | 32 -
 .../scala/org/apache/spark/sql/RowTest.scala| 20 +++
 .../local/NestedLoopJoinNodeSuite.scala | 36 
 3 files changed, 65 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31296628/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
--
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
index 0f0f200..b14c66c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
@@ -191,7 +191,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getBoolean(i: Int): Boolean = getAs[Boolean](i)
+  def getBoolean(i: Int): Boolean = getAnyValAs[Boolean](i)
 
   /**
* Returns the value at position i as a primitive byte.
@@ -199,7 +199,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getByte(i: Int): Byte = getAs[Byte](i)
+  def getByte(i: Int): Byte = getAnyValAs[Byte](i)
 
   /**
* Returns the value at position i as a primitive short.
@@ -207,7 +207,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getShort(i: Int): Short = getAs[Short](i)
+  def getShort(i: Int): Short = getAnyValAs[Short](i)
 
   /**
* Returns the value at position i as a primitive int.
@@ -215,7 +215,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getInt(i: Int): Int = getAs[Int](i)
+  def getInt(i: Int): Int = getAnyValAs[Int](i)
 
   /**
* Returns the value at position i as a primitive long.
@@ -223,7 +223,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getLong(i: Int): Long = getAs[Long](i)
+  def getLong(i: Int): Long = getAnyValAs[Long](i)
 
   /**
* Returns the value at position i as a primitive float.
@@ -232,7 +232,7 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getFloat(i: Int): Float = getAs[Float](i)
+  def getFloat(i: Int): Float = getAnyValAs[Float](i)
 
   /**
* Returns the value at position i as a primitive double.
@@ -240,13 +240,12 @@ trait Row extends Serializable {
* @throws ClassCastException when data type does not match.
* @throws NullPointerException when value is null.
*/
-  def getDouble(i: Int): Double = getAs[Double](i)
+  def getDouble(i: Int): Double = getAnyValAs[Double](i)
 
   /**
* Returns the value at position i as a String object.
*
* @throws ClassCastException when data type does not match.
-   * @throws NullPointerException when value is null.
*/
   def getString(i: Int): String = getAs[String](i)
 
@@ -318,6 +317,8 @@ trait Row extends Serializable {
 
   /**
* Returns the value at position i.
+   * For primitive types if value is null it returns 'zero value' specific for 
primitive
+   * ie. 0 for Int - use isNullAt to ensure that value is not null
*
* @throws ClassCastException when data type does not match.
*/
@@ -325,6 +326,8 @@ trait Row extends Serializable {
 
   /**
* Returns the value of a given 

spark git commit: [SPARK-11612][ML] Pipeline and PipelineModel persistence

2015-11-16 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 32a69e4c1 -> 505eceef3


[SPARK-11612][ML] Pipeline and PipelineModel persistence

Pipeline and PipelineModel extend Readable and Writable.  Persistence succeeds 
only when all stages are Writable.

Note: This PR reinstates tests for other read/write functionality.  It should 
probably not get merged until 
[https://issues.apache.org/jira/browse/SPARK-11672] gets fixed.

CC: mengxr

Author: Joseph K. Bradley 

Closes #9674 from jkbradley/pipeline-io.

(cherry picked from commit 1c5475f1401d2233f4c61f213d1e2c2ee9673067)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/505eceef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/505eceef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/505eceef

Branch: refs/heads/branch-1.6
Commit: 505eceef303e3291253b35164fbec7e4390e8252
Parents: 32a69e4
Author: Joseph K. Bradley 
Authored: Mon Nov 16 17:12:39 2015 -0800
Committer: Joseph K. Bradley 
Committed: Mon Nov 16 17:12:48 2015 -0800

--
 .../scala/org/apache/spark/ml/Pipeline.scala| 175 ++-
 .../org/apache/spark/ml/util/ReadWrite.scala|   4 +-
 .../org/apache/spark/ml/PipelineSuite.scala | 120 -
 .../spark/ml/util/DefaultReadWriteTest.scala|  25 +--
 4 files changed, 306 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/505eceef/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index a3e5940..25f0c69 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -22,12 +22,19 @@ import java.{util => ju}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.Logging
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkContext, Logging}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.ml.param.{Param, ParamMap, Params}
-import org.apache.spark.ml.util.Identifiable
+import org.apache.spark.ml.util.Reader
+import org.apache.spark.ml.util.Writer
+import org.apache.spark.ml.util._
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -82,7 +89,7 @@ abstract class PipelineStage extends Params with Logging {
  * an identity transformer.
  */
 @Experimental
-class Pipeline(override val uid: String) extends Estimator[PipelineModel] {
+class Pipeline(override val uid: String) extends Estimator[PipelineModel] with 
Writable {
 
   def this() = this(Identifiable.randomUID("pipeline"))
 
@@ -166,6 +173,131 @@ class Pipeline(override val uid: String) extends 
Estimator[PipelineModel] {
   "Cannot have duplicate components in a pipeline.")
 theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur))
   }
+
+  override def write: Writer = new Pipeline.PipelineWriter(this)
+}
+
+object Pipeline extends Readable[Pipeline] {
+
+  override def read: Reader[Pipeline] = new PipelineReader
+
+  override def load(path: String): Pipeline = read.load(path)
+
+  private[ml] class PipelineWriter(instance: Pipeline) extends Writer {
+
+SharedReadWrite.validateStages(instance.getStages)
+
+override protected def saveImpl(path: String): Unit =
+  SharedReadWrite.saveImpl(instance, instance.getStages, sc, path)
+  }
+
+  private[ml] class PipelineReader extends Reader[Pipeline] {
+
+/** Checked against metadata when loading model */
+private val className = "org.apache.spark.ml.Pipeline"
+
+override def load(path: String): Pipeline = {
+  val (uid: String, stages: Array[PipelineStage]) = 
SharedReadWrite.load(className, sc, path)
+  new Pipeline(uid).setStages(stages)
+}
+  }
+
+  /** Methods for [[Reader]] and [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+  private[ml] object SharedReadWrite {
+
+import org.json4s.JsonDSL._
+
+/** Check that all stages are Writable */
+def validateStages(stages: Array[PipelineStage]): Unit = {
+  stages.foreach {
+case stage: Writable => // good
+case other =>
+  throw new UnsupportedOperationException("Pipeline write will fail on 
this Pipeline" +
+s" because it contains a stage which does not implement Writable. 
Non-Writable stage:" +
+s" ${other.uid} of 

spark git commit: [SPARK-11612][ML] Pipeline and PipelineModel persistence

2015-11-16 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master bd10eb81c -> 1c5475f14


[SPARK-11612][ML] Pipeline and PipelineModel persistence

Pipeline and PipelineModel extend Readable and Writable.  Persistence succeeds 
only when all stages are Writable.

Note: This PR reinstates tests for other read/write functionality.  It should 
probably not get merged until 
[https://issues.apache.org/jira/browse/SPARK-11672] gets fixed.

CC: mengxr

Author: Joseph K. Bradley 

Closes #9674 from jkbradley/pipeline-io.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c5475f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c5475f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c5475f1

Branch: refs/heads/master
Commit: 1c5475f1401d2233f4c61f213d1e2c2ee9673067
Parents: bd10eb8
Author: Joseph K. Bradley 
Authored: Mon Nov 16 17:12:39 2015 -0800
Committer: Joseph K. Bradley 
Committed: Mon Nov 16 17:12:39 2015 -0800

--
 .../scala/org/apache/spark/ml/Pipeline.scala| 175 ++-
 .../org/apache/spark/ml/util/ReadWrite.scala|   4 +-
 .../org/apache/spark/ml/PipelineSuite.scala | 120 -
 .../spark/ml/util/DefaultReadWriteTest.scala|  25 +--
 4 files changed, 306 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1c5475f1/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index a3e5940..25f0c69 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -22,12 +22,19 @@ import java.{util => ju}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.Logging
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkContext, Logging}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.ml.param.{Param, ParamMap, Params}
-import org.apache.spark.ml.util.Identifiable
+import org.apache.spark.ml.util.Reader
+import org.apache.spark.ml.util.Writer
+import org.apache.spark.ml.util._
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -82,7 +89,7 @@ abstract class PipelineStage extends Params with Logging {
  * an identity transformer.
  */
 @Experimental
-class Pipeline(override val uid: String) extends Estimator[PipelineModel] {
+class Pipeline(override val uid: String) extends Estimator[PipelineModel] with 
Writable {
 
   def this() = this(Identifiable.randomUID("pipeline"))
 
@@ -166,6 +173,131 @@ class Pipeline(override val uid: String) extends 
Estimator[PipelineModel] {
   "Cannot have duplicate components in a pipeline.")
 theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur))
   }
+
+  override def write: Writer = new Pipeline.PipelineWriter(this)
+}
+
+object Pipeline extends Readable[Pipeline] {
+
+  override def read: Reader[Pipeline] = new PipelineReader
+
+  override def load(path: String): Pipeline = read.load(path)
+
+  private[ml] class PipelineWriter(instance: Pipeline) extends Writer {
+
+SharedReadWrite.validateStages(instance.getStages)
+
+override protected def saveImpl(path: String): Unit =
+  SharedReadWrite.saveImpl(instance, instance.getStages, sc, path)
+  }
+
+  private[ml] class PipelineReader extends Reader[Pipeline] {
+
+/** Checked against metadata when loading model */
+private val className = "org.apache.spark.ml.Pipeline"
+
+override def load(path: String): Pipeline = {
+  val (uid: String, stages: Array[PipelineStage]) = 
SharedReadWrite.load(className, sc, path)
+  new Pipeline(uid).setStages(stages)
+}
+  }
+
+  /** Methods for [[Reader]] and [[Writer]] shared between [[Pipeline]] and 
[[PipelineModel]] */
+  private[ml] object SharedReadWrite {
+
+import org.json4s.JsonDSL._
+
+/** Check that all stages are Writable */
+def validateStages(stages: Array[PipelineStage]): Unit = {
+  stages.foreach {
+case stage: Writable => // good
+case other =>
+  throw new UnsupportedOperationException("Pipeline write will fail on 
this Pipeline" +
+s" because it contains a stage which does not implement Writable. 
Non-Writable stage:" +
+s" ${other.uid} of type ${other.getClass}")
+  }
+}
+
+/**
+ * Save metadata and stages for a [[Pipeline]] or [[PipelineModel]]
+ *  - 

spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

2015-11-16 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 505eceef3 -> e12ecfa36


[SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

The code was using the wrong API to add data to the internal composite
buffer, causing buffers to leak in certain situations. Use the right
API and enhance the tests to catch memory leaks.

Also, avoid reusing the composite buffers when downstream handlers keep
references to them; this seems to cause a few different issues even though
the ref counting code seems to be correct, so instead pay the cost of copying
a few bytes when that situation happens.

Author: Marcelo Vanzin 

Closes #9619 from vanzin/SPARK-11617.

(cherry picked from commit 540bf58f18328c68107d6c616ffd70f3a4640054)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e12ecfa3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e12ecfa3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e12ecfa3

Branch: refs/heads/branch-1.6
Commit: e12ecfa360c54add488096493ce0dba15cbf3f8d
Parents: 505ecee
Author: Marcelo Vanzin 
Authored: Mon Nov 16 17:28:11 2015 -0800
Committer: Marcelo Vanzin 
Committed: Mon Nov 16 17:28:22 2015 -0800

--
 .../network/util/TransportFrameDecoder.java |  47 --
 .../util/TransportFrameDecoderSuite.java| 145 +++
 2 files changed, 151 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
--
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
index 272ea84..5889562 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
@@ -56,32 +56,43 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
   buffer = in.alloc().compositeBuffer();
 }
 
-buffer.writeBytes(in);
+buffer.addComponent(in).writerIndex(buffer.writerIndex() + 
in.readableBytes());
 
 while (buffer.isReadable()) {
-  feedInterceptor();
-  if (interceptor != null) {
-continue;
-  }
+  discardReadBytes();
+  if (!feedInterceptor()) {
+ByteBuf frame = decodeNext();
+if (frame == null) {
+  break;
+}
 
-  ByteBuf frame = decodeNext();
-  if (frame != null) {
 ctx.fireChannelRead(frame);
-  } else {
-break;
   }
 }
 
-// We can't discard read sub-buffers if there are other references to the 
buffer (e.g.
-// through slices used for framing). This assumes that code that retains 
references
-// will call retain() from the thread that called "fireChannelRead()" 
above, otherwise
-// ref counting will go awry.
-if (buffer != null && buffer.refCnt() == 1) {
+discardReadBytes();
+  }
+
+  private void discardReadBytes() {
+// If the buffer's been retained by downstream code, then make a copy of 
the remaining
+// bytes into a new buffer. Otherwise, just discard stale components.
+if (buffer.refCnt() > 1) {
+  CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer();
+
+  if (buffer.readableBytes() > 0) {
+ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes());
+spillBuf.writeBytes(buffer);
+newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes());
+  }
+
+  buffer.release();
+  buffer = newBuffer;
+} else {
   buffer.discardReadComponents();
 }
   }
 
-  protected ByteBuf decodeNext() throws Exception {
+  private ByteBuf decodeNext() throws Exception {
 if (buffer.readableBytes() < LENGTH_SIZE) {
   return null;
 }
@@ -127,10 +138,14 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
 this.interceptor = interceptor;
   }
 
-  private void feedInterceptor() throws Exception {
+  /**
+   * @return Whether the interceptor is still active after processing the data.
+   */
+  private boolean feedInterceptor() throws Exception {
 if (interceptor != null && !interceptor.handle(buffer)) {
   interceptor = null;
 }
+return interceptor != null;
   }
 
   public static interface Interceptor {

http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
--
diff 

[1/2] spark git commit: Preparing Spark release v1.6.0-preview

2015-11-16 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e12ecfa36 -> 5a6f40459


Preparing Spark release v1.6.0-preview


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31db3610
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31db3610
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31db3610

Branch: refs/heads/branch-1.6
Commit: 31db361003715dc4f41176bd6cf572874f0b2676
Parents: e12ecfa
Author: Patrick Wendell 
Authored: Mon Nov 16 18:43:43 2015 -0800
Committer: Patrick Wendell 
Committed: Mon Nov 16 18:43:43 2015 -0800

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 docker-integration-tests/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tags/pom.xml| 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 35 files changed, 35 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 4b60ee0..fbabaa5 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0-SNAPSHOT
+1.6.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 672e946..1b3e417 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0-SNAPSHOT
+1.6.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index 37e3f16..d32b93b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0-SNAPSHOT
+1.6.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/docker-integration-tests/pom.xml
--
diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml
index dee0c4a..ee9de91 100644
--- a/docker-integration-tests/pom.xml
+++ b/docker-integration-tests/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0-SNAPSHOT
+1.6.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index f5ab2a7..37b15bb 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0-SNAPSHOT
+1.6.0
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index dceedcf..295455a 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0-SNAPSHOT
+1.6.0
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/external/flume-sink/pom.xml

[2/2] spark git commit: Preparing development version 1.6.0-SNAPSHOT

2015-11-16 Thread pwendell
Preparing development version 1.6.0-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a6f4045
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a6f4045
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a6f4045

Branch: refs/heads/branch-1.6
Commit: 5a6f40459066bfa5c9221b2b64a9f8f13ee5013e
Parents: 31db361
Author: Patrick Wendell 
Authored: Mon Nov 16 18:43:49 2015 -0800
Committer: Patrick Wendell 
Committed: Mon Nov 16 18:43:49 2015 -0800

--
 assembly/pom.xml| 2 +-
 bagel/pom.xml   | 2 +-
 core/pom.xml| 2 +-
 docker-integration-tests/pom.xml| 2 +-
 examples/pom.xml| 2 +-
 external/flume-assembly/pom.xml | 2 +-
 external/flume-sink/pom.xml | 2 +-
 external/flume/pom.xml  | 2 +-
 external/kafka-assembly/pom.xml | 2 +-
 external/kafka/pom.xml  | 2 +-
 external/mqtt-assembly/pom.xml  | 2 +-
 external/mqtt/pom.xml   | 2 +-
 external/twitter/pom.xml| 2 +-
 external/zeromq/pom.xml | 2 +-
 extras/java8-tests/pom.xml  | 2 +-
 extras/kinesis-asl-assembly/pom.xml | 2 +-
 extras/kinesis-asl/pom.xml  | 2 +-
 extras/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml  | 2 +-
 launcher/pom.xml| 2 +-
 mllib/pom.xml   | 2 +-
 network/common/pom.xml  | 2 +-
 network/shuffle/pom.xml | 2 +-
 network/yarn/pom.xml| 2 +-
 pom.xml | 2 +-
 repl/pom.xml| 2 +-
 sql/catalyst/pom.xml| 2 +-
 sql/core/pom.xml| 2 +-
 sql/hive-thriftserver/pom.xml   | 2 +-
 sql/hive/pom.xml| 2 +-
 streaming/pom.xml   | 2 +-
 tags/pom.xml| 2 +-
 tools/pom.xml   | 2 +-
 unsafe/pom.xml  | 2 +-
 yarn/pom.xml| 2 +-
 35 files changed, 35 insertions(+), 35 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index fbabaa5..4b60ee0 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0
+1.6.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/bagel/pom.xml
--
diff --git a/bagel/pom.xml b/bagel/pom.xml
index 1b3e417..672e946 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0
+1.6.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/core/pom.xml
--
diff --git a/core/pom.xml b/core/pom.xml
index d32b93b..37e3f16 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0
+1.6.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/docker-integration-tests/pom.xml
--
diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml
index ee9de91..dee0c4a 100644
--- a/docker-integration-tests/pom.xml
+++ b/docker-integration-tests/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0
+1.6.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/examples/pom.xml
--
diff --git a/examples/pom.xml b/examples/pom.xml
index 37b15bb..f5ab2a7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0
+1.6.0-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/external/flume-assembly/pom.xml
--
diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml
index 295455a..dceedcf 100644
--- a/external/flume-assembly/pom.xml
+++ b/external/flume-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.10
-1.6.0
+1.6.0-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/external/flume-sink/pom.xml
--
diff --git a/external/flume-sink/pom.xml 

Git Push Summary

2015-11-16 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v1.6.0-preview [created] 31db36100

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

2015-11-16 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 1c5475f14 -> 540bf58f1


[SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.

The code was using the wrong API to add data to the internal composite
buffer, causing buffers to leak in certain situations. Use the right
API and enhance the tests to catch memory leaks.

Also, avoid reusing the composite buffers when downstream handlers keep
references to them; this seems to cause a few different issues even though
the ref counting code seems to be correct, so instead pay the cost of copying
a few bytes when that situation happens.

Author: Marcelo Vanzin 

Closes #9619 from vanzin/SPARK-11617.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540bf58f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540bf58f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540bf58f

Branch: refs/heads/master
Commit: 540bf58f18328c68107d6c616ffd70f3a4640054
Parents: 1c5475f
Author: Marcelo Vanzin 
Authored: Mon Nov 16 17:28:11 2015 -0800
Committer: Marcelo Vanzin 
Committed: Mon Nov 16 17:28:11 2015 -0800

--
 .../network/util/TransportFrameDecoder.java |  47 --
 .../util/TransportFrameDecoderSuite.java| 145 +++
 2 files changed, 151 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
--
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
index 272ea84..5889562 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java
@@ -56,32 +56,43 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
   buffer = in.alloc().compositeBuffer();
 }
 
-buffer.writeBytes(in);
+buffer.addComponent(in).writerIndex(buffer.writerIndex() + 
in.readableBytes());
 
 while (buffer.isReadable()) {
-  feedInterceptor();
-  if (interceptor != null) {
-continue;
-  }
+  discardReadBytes();
+  if (!feedInterceptor()) {
+ByteBuf frame = decodeNext();
+if (frame == null) {
+  break;
+}
 
-  ByteBuf frame = decodeNext();
-  if (frame != null) {
 ctx.fireChannelRead(frame);
-  } else {
-break;
   }
 }
 
-// We can't discard read sub-buffers if there are other references to the 
buffer (e.g.
-// through slices used for framing). This assumes that code that retains 
references
-// will call retain() from the thread that called "fireChannelRead()" 
above, otherwise
-// ref counting will go awry.
-if (buffer != null && buffer.refCnt() == 1) {
+discardReadBytes();
+  }
+
+  private void discardReadBytes() {
+// If the buffer's been retained by downstream code, then make a copy of 
the remaining
+// bytes into a new buffer. Otherwise, just discard stale components.
+if (buffer.refCnt() > 1) {
+  CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer();
+
+  if (buffer.readableBytes() > 0) {
+ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes());
+spillBuf.writeBytes(buffer);
+newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes());
+  }
+
+  buffer.release();
+  buffer = newBuffer;
+} else {
   buffer.discardReadComponents();
 }
   }
 
-  protected ByteBuf decodeNext() throws Exception {
+  private ByteBuf decodeNext() throws Exception {
 if (buffer.readableBytes() < LENGTH_SIZE) {
   return null;
 }
@@ -127,10 +138,14 @@ public class TransportFrameDecoder extends 
ChannelInboundHandlerAdapter {
 this.interceptor = interceptor;
   }
 
-  private void feedInterceptor() throws Exception {
+  /**
+   * @return Whether the interceptor is still active after processing the data.
+   */
+  private boolean feedInterceptor() throws Exception {
 if (interceptor != null && !interceptor.handle(buffer)) {
   interceptor = null;
 }
+return interceptor != null;
   }
 
   public static interface Interceptor {

http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
--
diff --git 
a/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
 

spark git commit: [SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for metadata and add a test for FIXED_LEN_BYTE_ARRAY

2015-11-16 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master fbad920db -> 75d202073


[SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for metadata 
and add a test for FIXED_LEN_BYTE_ARRAY

As discussed https://github.com/apache/spark/pull/9660 
https://github.com/apache/spark/pull/9060, I cleaned up unused imports, added a 
test for fixed-length byte array and used a common function for writing 
metadata for Parquet.

For the test for fixed-length byte array, I have tested and checked the 
encoding types with 
[parquet-tools](https://github.com/Parquet/parquet-mr/tree/master/parquet-tools).

Author: hyukjinkwon 

Closes #9754 from HyukjinKwon/SPARK-11694-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75d20207
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75d20207
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75d20207

Branch: refs/heads/master
Commit: 75d202073143d5a7f943890d8682b5b0cf9e3092
Parents: fbad920
Author: hyukjinkwon 
Authored: Tue Nov 17 14:35:00 2015 +0800
Committer: Cheng Lian 
Committed: Tue Nov 17 14:35:00 2015 +0800

--
 .../src/test/resources/dec-in-fixed-len.parquet | Bin 0 -> 460 bytes
 .../datasources/parquet/ParquetIOSuite.scala|  42 +++
 2 files changed, 15 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/75d20207/sql/core/src/test/resources/dec-in-fixed-len.parquet
--
diff --git a/sql/core/src/test/resources/dec-in-fixed-len.parquet 
b/sql/core/src/test/resources/dec-in-fixed-len.parquet
new file mode 100644
index 000..6ad37d5
Binary files /dev/null and 
b/sql/core/src/test/resources/dec-in-fixed-len.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/75d20207/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a148fac..177ab42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.util.Collections
-
 import org.apache.parquet.column.{Encoding, ParquetProperties}
 
 import scala.collection.JavaConverters._
@@ -33,7 +31,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter}
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, 
ParquetMetadata}
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
@@ -243,15 +241,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
   """.stripMargin)
 
 withTempPath { location =>
-  val extraMetadata = Map.empty[String, String].asJava
-  val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
   val path = new Path(location.getCanonicalPath)
-  val footer = List(
-new Footer(path, new ParquetMetadata(fileMetadata, 
Collections.emptyList()))
-  ).asJava
-
-  ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, 
path, footer)
-
+  val conf = sparkContext.hadoopConfiguration
+  writeMetadata(parquetSchema, path, conf)
   val errorMessage = intercept[Throwable] {
 sqlContext.read.parquet(path.toString).printSchema()
   }.toString
@@ -267,20 +259,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 |}
   """.stripMargin)
 
+val expectedSparkTypes = Seq(StringType, BinaryType)
+
 withTempPath { location =>
-  val extraMetadata = Map.empty[String, String].asJava
-  val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
   val path = new Path(location.getCanonicalPath)
-  val footer = List(
-new Footer(path, new ParquetMetadata(fileMetadata, 
Collections.emptyList()))
-  ).asJava
-
-  ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, 
path, footer)
-
-  val jsonDataType = 
sqlContext.read.parquet(path.toString).schema(0).dataType
-  assert(jsonDataType === 

spark git commit: [SPARK-11745][SQL] Enable more JSON parsing options

2015-11-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master fd50fa4c3 -> 42de5253f


[SPARK-11745][SQL] Enable more JSON parsing options

This patch adds the following options to the JSON data source, for dealing with 
non-standard JSON files:
* `allowComments` (default `false`): ignores Java/C++ style comment in JSON 
records
* `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
* `allowSingleQuotes` (default `true`): allows single quotes in addition to 
double quotes
* `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers 
(e.g. 00012)

To avoid passing a lot of options throughout the json package, I introduced a 
new JSONOptions case class to define all JSON config options.

Also updated documentation to explain these options.

Scala

![screen shot 2015-11-15 at 6 12 12 
pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png)

Python

![screen shot 2015-11-15 at 6 11 28 
pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png)

Author: Reynold Xin 

Closes #9724 from rxin/SPARK-11745.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42de5253
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42de5253
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42de5253

Branch: refs/heads/master
Commit: 42de5253f327bd7ee258b0efb5024f3847fa3b51
Parents: fd50fa4
Author: Reynold Xin 
Authored: Mon Nov 16 00:06:14 2015 -0800
Committer: Reynold Xin 
Committed: Mon Nov 16 00:06:14 2015 -0800

--
 python/pyspark/sql/readwriter.py|  10 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  22 ++--
 .../apache/spark/sql/execution/SparkPlan.scala  |  17 +--
 .../datasources/json/InferSchema.scala  |  34 +++---
 .../datasources/json/JSONOptions.scala  |  64 +++
 .../datasources/json/JSONRelation.scala |  20 ++--
 .../datasources/json/JacksonParser.scala|  82 ++---
 .../json/JsonParsingOptionsSuite.scala  | 114 +++
 .../execution/datasources/json/JsonSuite.scala  |  29 +++--
 9 files changed, 286 insertions(+), 106 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42de5253/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 927f407..7b8ddb9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -153,6 +153,16 @@ class DataFrameReader(object):
  or RDD of Strings storing JSON objects.
 :param schema: an optional :class:`StructType` for the input schema.
 
+You can set the following JSON-specific options to deal with 
non-standard JSON files:
+* ``primitivesAsString`` (default ``false``): infers all primitive 
values as a string \
+type
+* ``allowComments`` (default ``false``): ignores Java/C++ style 
comment in JSON records
+* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted 
JSON field names
+* ``allowSingleQuotes`` (default ``true``): allows single quotes 
in addition to double \
+quotes
+* ``allowNumericLeadingZeros`` (default ``false``): allows leading 
zeros in numbers \
+(e.g. 00012)
+
 >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
 >>> df1.dtypes
 [('age', 'bigint'), ('name', 'string')]

http://git-wip-us.apache.org/repos/asf/spark/blob/42de5253/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 6a194a4..5872fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, 
JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.json.{JSONOptions, 
JSONRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, 
ResolvedDataSource}
 import org.apache.spark.sql.types.StructType
@@ -227,6 +227,15 @@ 

spark git commit: [SPARK-11745][SQL] Enable more JSON parsing options

2015-11-16 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 053c63ecf -> a0f9cd77a


[SPARK-11745][SQL] Enable more JSON parsing options

This patch adds the following options to the JSON data source, for dealing with 
non-standard JSON files:
* `allowComments` (default `false`): ignores Java/C++ style comment in JSON 
records
* `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names
* `allowSingleQuotes` (default `true`): allows single quotes in addition to 
double quotes
* `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers 
(e.g. 00012)

To avoid passing a lot of options throughout the json package, I introduced a 
new JSONOptions case class to define all JSON config options.

Also updated documentation to explain these options.

Scala

![screen shot 2015-11-15 at 6 12 12 
pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png)

Python

![screen shot 2015-11-15 at 6 11 28 
pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png)

Author: Reynold Xin 

Closes #9724 from rxin/SPARK-11745.

(cherry picked from commit 42de5253f327bd7ee258b0efb5024f3847fa3b51)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0f9cd77
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0f9cd77
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0f9cd77

Branch: refs/heads/branch-1.6
Commit: a0f9cd77a73ae911d382ac8ddd52614800663704
Parents: 053c63e
Author: Reynold Xin 
Authored: Mon Nov 16 00:06:14 2015 -0800
Committer: Reynold Xin 
Committed: Mon Nov 16 00:06:21 2015 -0800

--
 python/pyspark/sql/readwriter.py|  10 ++
 .../org/apache/spark/sql/DataFrameReader.scala  |  22 ++--
 .../apache/spark/sql/execution/SparkPlan.scala  |  17 +--
 .../datasources/json/InferSchema.scala  |  34 +++---
 .../datasources/json/JSONOptions.scala  |  64 +++
 .../datasources/json/JSONRelation.scala |  20 ++--
 .../datasources/json/JacksonParser.scala|  82 ++---
 .../json/JsonParsingOptionsSuite.scala  | 114 +++
 .../execution/datasources/json/JsonSuite.scala  |  29 +++--
 9 files changed, 286 insertions(+), 106 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 927f407..7b8ddb9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -153,6 +153,16 @@ class DataFrameReader(object):
  or RDD of Strings storing JSON objects.
 :param schema: an optional :class:`StructType` for the input schema.
 
+You can set the following JSON-specific options to deal with 
non-standard JSON files:
+* ``primitivesAsString`` (default ``false``): infers all primitive 
values as a string \
+type
+* ``allowComments`` (default ``false``): ignores Java/C++ style 
comment in JSON records
+* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted 
JSON field names
+* ``allowSingleQuotes`` (default ``true``): allows single quotes 
in addition to double \
+quotes
+* ``allowNumericLeadingZeros`` (default ``false``): allows leading 
zeros in numbers \
+(e.g. 00012)
+
 >>> df1 = sqlContext.read.json('python/test_support/sql/people.json')
 >>> df1.dtypes
 [('age', 'bigint'), ('name', 'string')]

http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 6a194a4..5872fbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, 
JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.json.{JSONOptions, 
JSONRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import 

spark git commit: [SPARK-11447][SQL] change NullType to StringType during binaryComparison between NullType and StringType

2015-11-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 07ac8e950 -> 113410c12


[SPARK-11447][SQL] change NullType to StringType during binaryComparison 
between NullType and StringType

During executing PromoteStrings rule, if one side of binaryComparison is 
StringType and the other side is not StringType, the current code will 
promote(cast) the StringType to DoubleType, and if the StringType doesn't 
contain the numbers, it will get null value. So if it is doing <=> (NULL-safe 
equal) with Null, it will not filter anything, caused the problem reported by 
this jira.

I proposal to the changes through this PR, can you review my code changes ?

This problem only happen for <=>, other operators works fine.

scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null
filteredDF: org.apache.spark.sql.DataFrame = [column: string]

scala> filteredDF.show
+--+
|column|
+--+
+--+

scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null
filteredDF: org.apache.spark.sql.DataFrame = [column: string]

scala> filteredDF.show
+--+
|column|
+--+
+--+

scala> df.registerTempTable("DF")

scala> sqlContext.sql("select * from DF where 'column' = NULL")
res27: org.apache.spark.sql.DataFrame = [column: string]

scala> res27.show
+--+
|column|
+--+
+--+

Author: Kevin Yu 

Closes #9720 from kevinyu98/working_on_spark-11447.

(cherry picked from commit e01865af0d5ebe11033de46c388c5c583876c187)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/113410c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/113410c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/113410c1

Branch: refs/heads/branch-1.6
Commit: 113410c12529caf5d1f528efeba4d22489ed78ec
Parents: 07ac8e9
Author: Kevin Yu 
Authored: Mon Nov 16 22:54:29 2015 -0800
Committer: Yin Huai 
Committed: Mon Nov 16 22:55:13 2015 -0800

--
 .../spark/sql/catalyst/analysis/HiveTypeCoercion.scala   |  6 ++
 .../org/apache/spark/sql/ColumnExpressionSuite.scala | 11 +++
 2 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/113410c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index 92188ee..f90fc3c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -281,6 +281,12 @@ object HiveTypeCoercion {
   case p @ BinaryComparison(left @ DateType(), right @ TimestampType()) =>
 p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType)))
 
+  // Checking NullType
+  case p @ BinaryComparison(left @ StringType(), right @ NullType()) =>
+p.makeCopy(Array(left, Literal.create(null, StringType)))
+  case p @ BinaryComparison(left @ NullType(), right @ StringType()) =>
+p.makeCopy(Array(Literal.create(null, StringType), right))
+
   case p @ BinaryComparison(left @ StringType(), right) if right.dataType 
!= StringType =>
 p.makeCopy(Array(Cast(left, DoubleType), right))
   case p @ BinaryComparison(left, right @ StringType()) if left.dataType 
!= StringType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/113410c1/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 3eae3f6..38c0eb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -368,6 +368,17 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 checkAnswer(
   nullData.filter($"a" <=> $"b"),
   Row(1, 1) :: Row(null, null) :: Nil)
+
+val nullData2 = sqlContext.createDataFrame(sparkContext.parallelize(
+Row("abc") ::
+Row(null)  ::
+Row("xyz") :: Nil),
+StructType(Seq(StructField("a", StringType, true
+
+checkAnswer(
+  nullData2.filter($"a" <=> null),
+  Row(null) :: Nil)
+
   }
 
   test(">") {


-
To unsubscribe, e-mail: 

spark git commit: [MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master e01865af0 -> d79d8b08f


[MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite

The randomly generated ArrayData used for the UDT `ExamplePoint` in 
`RowEncoderSuite` sometimes doesn't have enough elements. In this case, this 
test will fail. This patch is to fix it.

Author: Liang-Chi Hsieh 

Closes #9757 from viirya/fix-randomgenerated-udt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d79d8b08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d79d8b08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d79d8b08

Branch: refs/heads/master
Commit: d79d8b08ff69b30b02fe87839e695e29bfea5ace
Parents: e01865a
Author: Liang-Chi Hsieh 
Authored: Mon Nov 16 23:16:17 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 23:16:17 2015 -0800

--
 .../spark/sql/catalyst/encoders/RowEncoderSuite.scala   | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d79d8b08/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index c868dde..46c6e0d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.encoders
 
+import scala.util.Random
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayData}
@@ -59,7 +61,12 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
   override def deserialize(datum: Any): ExamplePoint = {
 datum match {
   case values: ArrayData =>
-new ExamplePoint(values.getDouble(0), values.getDouble(1))
+if (values.numElements() > 1) {
+  new ExamplePoint(values.getDouble(0), values.getDouble(1))
+} else {
+  val random = new Random()
+  new ExamplePoint(random.nextDouble(), random.nextDouble())
+}
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite

2015-11-16 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 113410c12 -> 2ae1fa074


[MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite

The randomly generated ArrayData used for the UDT `ExamplePoint` in 
`RowEncoderSuite` sometimes doesn't have enough elements. In this case, this 
test will fail. This patch is to fix it.

Author: Liang-Chi Hsieh 

Closes #9757 from viirya/fix-randomgenerated-udt.

(cherry picked from commit d79d8b08ff69b30b02fe87839e695e29bfea5ace)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ae1fa07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ae1fa07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ae1fa07

Branch: refs/heads/branch-1.6
Commit: 2ae1fa074c2dedc67bdde0ac48e2af9476d4cba1
Parents: 113410c
Author: Liang-Chi Hsieh 
Authored: Mon Nov 16 23:16:17 2015 -0800
Committer: Davies Liu 
Committed: Mon Nov 16 23:16:33 2015 -0800

--
 .../spark/sql/catalyst/encoders/RowEncoderSuite.scala   | 9 -
 1 file changed, 8 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ae1fa07/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index c868dde..46c6e0d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.encoders
 
+import scala.util.Random
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayData}
@@ -59,7 +61,12 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
   override def deserialize(datum: Any): ExamplePoint = {
 datum match {
   case values: ArrayData =>
-new ExamplePoint(values.getDouble(0), values.getDouble(1))
+if (values.numElements() > 1) {
+  new ExamplePoint(values.getDouble(0), values.getDouble(1))
+} else {
+  val random = new Random()
+  new ExamplePoint(random.nextDouble(), random.nextDouble())
+}
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-11447][SQL] change NullType to StringType during binaryComparison between NullType and StringType

2015-11-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 75d202073 -> e01865af0


[SPARK-11447][SQL] change NullType to StringType during binaryComparison 
between NullType and StringType

During executing PromoteStrings rule, if one side of binaryComparison is 
StringType and the other side is not StringType, the current code will 
promote(cast) the StringType to DoubleType, and if the StringType doesn't 
contain the numbers, it will get null value. So if it is doing <=> (NULL-safe 
equal) with Null, it will not filter anything, caused the problem reported by 
this jira.

I proposal to the changes through this PR, can you review my code changes ?

This problem only happen for <=>, other operators works fine.

scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null
filteredDF: org.apache.spark.sql.DataFrame = [column: string]

scala> filteredDF.show
+--+
|column|
+--+
+--+

scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null
filteredDF: org.apache.spark.sql.DataFrame = [column: string]

scala> filteredDF.show
+--+
|column|
+--+
+--+

scala> df.registerTempTable("DF")

scala> sqlContext.sql("select * from DF where 'column' = NULL")
res27: org.apache.spark.sql.DataFrame = [column: string]

scala> res27.show
+--+
|column|
+--+
+--+

Author: Kevin Yu 

Closes #9720 from kevinyu98/working_on_spark-11447.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e01865af
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e01865af
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e01865af

Branch: refs/heads/master
Commit: e01865af0d5ebe11033de46c388c5c583876c187
Parents: 75d2020
Author: Kevin Yu 
Authored: Mon Nov 16 22:54:29 2015 -0800
Committer: Yin Huai 
Committed: Mon Nov 16 22:54:29 2015 -0800

--
 .../spark/sql/catalyst/analysis/HiveTypeCoercion.scala   |  6 ++
 .../org/apache/spark/sql/ColumnExpressionSuite.scala | 11 +++
 2 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e01865af/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
index 92188ee..f90fc3c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala
@@ -281,6 +281,12 @@ object HiveTypeCoercion {
   case p @ BinaryComparison(left @ DateType(), right @ TimestampType()) =>
 p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType)))
 
+  // Checking NullType
+  case p @ BinaryComparison(left @ StringType(), right @ NullType()) =>
+p.makeCopy(Array(left, Literal.create(null, StringType)))
+  case p @ BinaryComparison(left @ NullType(), right @ StringType()) =>
+p.makeCopy(Array(Literal.create(null, StringType), right))
+
   case p @ BinaryComparison(left @ StringType(), right) if right.dataType 
!= StringType =>
 p.makeCopy(Array(Cast(left, DoubleType), right))
   case p @ BinaryComparison(left, right @ StringType()) if left.dataType 
!= StringType =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e01865af/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
index 3eae3f6..38c0eb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala
@@ -368,6 +368,17 @@ class ColumnExpressionSuite extends QueryTest with 
SharedSQLContext {
 checkAnswer(
   nullData.filter($"a" <=> $"b"),
   Row(1, 1) :: Row(null, null) :: Nil)
+
+val nullData2 = sqlContext.createDataFrame(sparkContext.parallelize(
+Row("abc") ::
+Row(null)  ::
+Row("xyz") :: Nil),
+StructType(Seq(StructField("a", StringType, true
+
+checkAnswer(
+  nullData2.filter($"a" <=> null),
+  Row(null) :: Nil)
+
   }
 
   test(">") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org