spark git commit: [SPARK-11522][SQL] input_file_name() returns "" for external tables
Repository: spark Updated Branches: refs/heads/branch-1.6 a0f9cd77a -> c37ed52ec [SPARK-11522][SQL] input_file_name() returns "" for external tables When computing partition for non-parquet relation, `HadoopRDD.compute` is used. but it does not set the thread local variable `inputFileName` in `NewSqlHadoopRDD`, like `NewSqlHadoopRDD.compute` does.. Yet, when getting the `inputFileName`, `NewSqlHadoopRDD.inputFileName` is exptected, which is empty now. Adding the setting inputFileName in HadoopRDD.compute resolves this issue. Author: xin WuCloses #9542 from xwu0226/SPARK-11522. (cherry picked from commit 0e79604aed116bdcb40e03553a2d103b5b1cdbae) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c37ed52e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c37ed52e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c37ed52e Branch: refs/heads/branch-1.6 Commit: c37ed52ecd38a400f99cc82f07440e455b772db2 Parents: a0f9cd7 Author: xin Wu Authored: Mon Nov 16 08:10:48 2015 -0800 Committer: Yin Huai Committed: Mon Nov 16 08:10:59 2015 -0800 -- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 7 ++ .../spark/sql/hive/execution/HiveUDFSuite.scala | 93 +++- 2 files changed, 98 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 0453614..7db5834 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,6 +213,12 @@ class HadoopRDD[K, V]( val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop) + // Sets the thread local variable for the file's name + split.inputSplit.value match { +case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString) +case _ => SqlNewHadoopRDD.unsetInputFileName() + } + // Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { @@ -250,6 +256,7 @@ class HadoopRDD[K, V]( override def close() { if (reader != null) { + SqlNewHadoopRDD.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic http://git-wip-us.apache.org/repos/asf/spark/blob/c37ed52e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala -- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index 5ab477e..9deb1a6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import java.io.{DataInput, DataOutput} +import java.io.{PrintWriter, File, DataInput, DataOutput} import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.io.Writable +import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.util.Utils @@ -44,7 +45,7 @@ case class ListStringCaseClass(l: Seq[String]) /** * A test suite for Hive custom UDFs. */ -class HiveUDFSuite extends QueryTest with TestHiveSingleton { +class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { import hiveContext.{udf, sql} import hiveContext.implicits._ @@ -348,6 +349,94 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton { sqlContext.dropTempTable("testUDF") } + + test("SPARK-11522
spark git commit: [SPARK-11044][SQL] Parquet writer version fixed as version1
Repository: spark Updated Branches: refs/heads/master 42de5253f -> 7f8eb3bf6 [SPARK-11044][SQL] Parquet writer version fixed as version1 https://issues.apache.org/jira/browse/SPARK-11044 Spark writes a parquet file only with writer version1 ignoring the writer version given by user. So, in this PR, it keeps the writer version if given or sets version1 as default. Author: hyukjinkwonAuthor: HyukjinKwon Closes #9060 from HyukjinKwon/SPARK-11044. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f8eb3bf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f8eb3bf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f8eb3bf Branch: refs/heads/master Commit: 7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618 Parents: 42de525 Author: hyukjinkwon Authored: Mon Nov 16 21:30:10 2015 +0800 Committer: Cheng Lian Committed: Mon Nov 16 21:30:10 2015 +0800 -- .../parquet/CatalystWriteSupport.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala| 34 2 files changed, 35 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f8eb3bf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 483363d..6862dea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport { def setSchema(schema: StructType, configuration: Configuration): Unit = { schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) configuration.set(SPARK_ROW_SCHEMA, schema.json) -configuration.set( +configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, ParquetProperties.WriterVersion.PARQUET_1_0.toString) } http://git-wip-us.apache.org/repos/asf/spark/blob/7f8eb3bf/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 78df363..2aa5dca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Collections +import org.apache.parquet.column.{Encoding, ParquetProperties} + import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-11044 Parquet writer version fixed as version1 ") { +// For dictionary encoding, Parquet changes the encoding types according to its writer +// version. So, this test checks one of the encoding types in order to ensure that +// the file is written with writer version2. +withTempPath { dir => + val clonedConf = new Configuration(hadoopConfiguration) + try { +// Write a Parquet file with writer version2. +hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_2_0.toString) + +// By default, dictionary encoding is enabled from Parquet 1.2.0 but +// it is enabled just in case. +hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true) +val path = s"${dir.getCanonicalPath}/part-r-0.parquet" +sqlContext.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.mode("overwrite").parquet(path) + +val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head +val columnChunkMetadata = blockMetadata.getColumns.asScala.head + +// If the file is written with version2, this should include +// Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY + assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + }
spark git commit: [SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds
Repository: spark Updated Branches: refs/heads/branch-1.6 c37ed52ec -> 949c9b7c6 [SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds code snippet to reproduce it: ``` TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")) val t = Timestamp.valueOf("1900-06-11 12:14:50.789") val us = fromJavaTimestamp(t) assert(getSeconds(us) === t.getSeconds) ``` it will be good to add a regression test for it, but the reproducing code need to change the default timezone, and even we change it back, the `lazy val defaultTimeZone` in `DataTimeUtils` is fixed. Author: Wenchen FanCloses #9728 from cloud-fan/seconds. (cherry picked from commit 06f1fdba6d1425afddfc1d45a20dbe9bede15e7a) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/949c9b7c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/949c9b7c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/949c9b7c Branch: refs/heads/branch-1.6 Commit: 949c9b7c660c37bbe543adb260380afb1258089e Parents: c37ed52 Author: Wenchen Fan Authored: Mon Nov 16 08:58:40 2015 -0800 Committer: Davies Liu Committed: Mon Nov 16 08:58:50 2015 -0800 -- .../spark/sql/catalyst/util/DateTimeUtils.scala | 14 -- .../spark/sql/catalyst/util/DateTimeUtilsSuite.scala | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/949c9b7c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index deff8a5..8fb3f41 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -395,16 +395,19 @@ object DateTimeUtils { /** * Returns the microseconds since year zero (-17999) from microseconds since epoch. */ - def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = { + private def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = { microsec + toYearZero * MICROS_PER_DAY } + private def localTimestamp(microsec: SQLTimestamp): SQLTimestamp = { +absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) * 1000L + } + /** * Returns the hour value of a given timestamp value. The timestamp is expressed in microseconds. */ def getHours(microsec: SQLTimestamp): Int = { -val localTs = absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) * 1000L -((localTs / MICROS_PER_SECOND / 3600) % 24).toInt +((localTimestamp(microsec) / MICROS_PER_SECOND / 3600) % 24).toInt } /** @@ -412,8 +415,7 @@ object DateTimeUtils { * microseconds. */ def getMinutes(microsec: SQLTimestamp): Int = { -val localTs = absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) * 1000L -((localTs / MICROS_PER_SECOND / 60) % 60).toInt +((localTimestamp(microsec) / MICROS_PER_SECOND / 60) % 60).toInt } /** @@ -421,7 +423,7 @@ object DateTimeUtils { * microseconds. */ def getSeconds(microsec: SQLTimestamp): Int = { -((absoluteMicroSecond(microsec) / MICROS_PER_SECOND) % 60).toInt +((localTimestamp(microsec) / MICROS_PER_SECOND) % 60).toInt } private[this] def isLeapYear(year: Int): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/949c9b7c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 64d15e6..60d4542 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -358,7 +358,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { assert(getSeconds(c.getTimeInMillis * 1000) === 9) } - test("hours / miniute / seconds") { + test("hours / minutes / seconds") { Seq(Timestamp.valueOf("2015-06-11 10:12:35.789"), Timestamp.valueOf("2015-06-11 20:13:40.789"), Timestamp.valueOf("1900-06-11 12:14:50.789"), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For
spark git commit: [SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds
Repository: spark Updated Branches: refs/heads/master 0e79604ae -> 06f1fdba6 [SPARK-11752] [SQL] fix timezone problem for DateTimeUtils.getSeconds code snippet to reproduce it: ``` TimeZone.setDefault(TimeZone.getTimeZone("Asia/Shanghai")) val t = Timestamp.valueOf("1900-06-11 12:14:50.789") val us = fromJavaTimestamp(t) assert(getSeconds(us) === t.getSeconds) ``` it will be good to add a regression test for it, but the reproducing code need to change the default timezone, and even we change it back, the `lazy val defaultTimeZone` in `DataTimeUtils` is fixed. Author: Wenchen FanCloses #9728 from cloud-fan/seconds. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06f1fdba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06f1fdba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06f1fdba Branch: refs/heads/master Commit: 06f1fdba6d1425afddfc1d45a20dbe9bede15e7a Parents: 0e79604 Author: Wenchen Fan Authored: Mon Nov 16 08:58:40 2015 -0800 Committer: Davies Liu Committed: Mon Nov 16 08:58:40 2015 -0800 -- .../spark/sql/catalyst/util/DateTimeUtils.scala | 14 -- .../spark/sql/catalyst/util/DateTimeUtilsSuite.scala | 2 +- 2 files changed, 9 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/06f1fdba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index deff8a5..8fb3f41 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -395,16 +395,19 @@ object DateTimeUtils { /** * Returns the microseconds since year zero (-17999) from microseconds since epoch. */ - def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = { + private def absoluteMicroSecond(microsec: SQLTimestamp): SQLTimestamp = { microsec + toYearZero * MICROS_PER_DAY } + private def localTimestamp(microsec: SQLTimestamp): SQLTimestamp = { +absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) * 1000L + } + /** * Returns the hour value of a given timestamp value. The timestamp is expressed in microseconds. */ def getHours(microsec: SQLTimestamp): Int = { -val localTs = absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) * 1000L -((localTs / MICROS_PER_SECOND / 3600) % 24).toInt +((localTimestamp(microsec) / MICROS_PER_SECOND / 3600) % 24).toInt } /** @@ -412,8 +415,7 @@ object DateTimeUtils { * microseconds. */ def getMinutes(microsec: SQLTimestamp): Int = { -val localTs = absoluteMicroSecond(microsec) + defaultTimeZone.getOffset(microsec / 1000) * 1000L -((localTs / MICROS_PER_SECOND / 60) % 60).toInt +((localTimestamp(microsec) / MICROS_PER_SECOND / 60) % 60).toInt } /** @@ -421,7 +423,7 @@ object DateTimeUtils { * microseconds. */ def getSeconds(microsec: SQLTimestamp): Int = { -((absoluteMicroSecond(microsec) / MICROS_PER_SECOND) % 60).toInt +((localTimestamp(microsec) / MICROS_PER_SECOND) % 60).toInt } private[this] def isLeapYear(year: Int): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/06f1fdba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 64d15e6..60d4542 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -358,7 +358,7 @@ class DateTimeUtilsSuite extends SparkFunSuite { assert(getSeconds(c.getTimeInMillis * 1000) === 9) } - test("hours / miniute / seconds") { + test("hours / minutes / seconds") { Seq(Timestamp.valueOf("2015-06-11 10:12:35.789"), Timestamp.valueOf("2015-06-11 20:13:40.789"), Timestamp.valueOf("1900-06-11 12:14:50.789"), - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11692][SQL] Support for Parquet logical types, JSON and BSON (embedded types)
Repository: spark Updated Branches: refs/heads/master 7f8eb3bf6 -> e388b39d1 [SPARK-11692][SQL] Support for Parquet logical types, JSON and BSON (embedded types) Parquet supports some JSON and BSON datatypes. They are represented as binary for BSON and string (UTF-8) for JSON internally. I searched a bit and found Apache drill also supports both in this way, [link](https://drill.apache.org/docs/parquet-format/). Author: hyukjinkwonAuthor: Hyukjin Kwon Closes #9658 from HyukjinKwon/SPARK-11692. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e388b39d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e388b39d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e388b39d Branch: refs/heads/master Commit: e388b39d10fc269cdd3d630ea7d4ae80fd0efa97 Parents: 7f8eb3b Author: hyukjinkwon Authored: Mon Nov 16 21:59:33 2015 +0800 Committer: Cheng Lian Committed: Mon Nov 16 21:59:33 2015 +0800 -- .../parquet/CatalystSchemaConverter.scala | 3 ++- .../datasources/parquet/ParquetIOSuite.scala| 25 2 files changed, 27 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e388b39d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index f28a18e..5f9f908 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -170,9 +170,10 @@ private[parquet] class CatalystSchemaConverter( case BINARY => originalType match { - case UTF8 | ENUM => StringType + case UTF8 | ENUM | JSON => StringType case null if assumeBinaryIsString => StringType case null => BinaryType + case BSON => BinaryType case DECIMAL => makeDecimalType() case _ => illegalType() } http://git-wip-us.apache.org/repos/asf/spark/blob/e388b39d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 2aa5dca..a148fac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -259,6 +259,31 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-11692 Support for Parquet logical types, JSON and BSON (embedded types)") { +val parquetSchema = MessageTypeParser.parseMessageType( + """message root { +| required binary a(JSON); +| required binary b(BSON); +|} + """.stripMargin) + +withTempPath { location => + val extraMetadata = Map.empty[String, String].asJava + val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") + val path = new Path(location.getCanonicalPath) + val footer = List( +new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) + ).asJava + + ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) + + val jsonDataType = sqlContext.read.parquet(path.toString).schema(0).dataType + assert(jsonDataType === StringType) + val bsonDataType = sqlContext.read.parquet(path.toString).schema(1).dataType + assert(bsonDataType === BinaryType) +} + } + test("compression codec") { def compressionCodecFor(path: String, codecName: String): String = { val codecs = for { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-10181][SQL] Do kerberos login for credentials during hive client initialization
Repository: spark Updated Branches: refs/heads/branch-1.5 bf79a171e -> 51fc152b7 [SPARK-10181][SQL] Do kerberos login for credentials during hive client initialization On driver process start up, UserGroupInformation.loginUserFromKeytab is called with the principal and keytab passed in, and therefore static var UserGroupInfomation,loginUser is set to that principal with kerberos credentials saved in its private credential set, and all threads within the driver process are supposed to see and use this login credentials to authenticate with Hive and Hadoop. However, because of IsolatedClientLoader, UserGroupInformation class is not shared for hive metastore clients, and instead it is loaded separately and of course not able to see the prepared kerberos login credentials in the main thread. The first proposed fix would cause other classloader conflict errors, and is not an appropriate solution. This new change does kerberos login during hive client initialization, which will make credentials ready for the particular hive client instance. yhuai Please take a look and let me know. If you are not the right person to talk to, could you point me to someone responsible for this? Author: Yu GaoAuthor: gaoyu Author: Yu Gao Closes #9272 from yolandagao/master. (cherry picked from commit 72c1d68b4ab6acb3f85971e10947caabb4bd846d) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51fc152b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51fc152b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51fc152b Branch: refs/heads/branch-1.5 Commit: 51fc152b7e194282940eab29fe0069edef8a67a5 Parents: bf79a17 Author: Yu Gao Authored: Sun Nov 15 14:53:59 2015 -0800 Committer: Yin Huai Committed: Mon Nov 16 10:29:39 2015 -0800 -- .../org/apache/spark/deploy/SparkSubmit.scala | 17 +++--- .../spark/sql/hive/client/ClientWrapper.scala | 24 +++- 2 files changed, 37 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51fc152b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fefbba9..dc555cb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,7 +39,7 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver} -import org.apache.spark.{SparkUserAppException, SPARK_VERSION} +import org.apache.spark.{SparkException, SparkUserAppException, SPARK_VERSION} import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -521,8 +521,19 @@ object SparkSubmit { sysProps.put("spark.yarn.isPython", "true") } if (args.principal != null) { -require(args.keytab != null, "Keytab must be specified when the keytab is specified") -UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) +require(args.keytab != null, "Keytab must be specified when principal is specified") +if (!new File(args.keytab).exists()) { + throw new SparkException(s"Keytab file: ${args.keytab} does not exist") +} else { + // Add keytab and principal configurations in sysProps to make them available + // for later use; e.g. in spark sql, the isolated class loader used to talk + // to HiveMetastore will use these settings. They will be set as Java system + // properties and then loaded by SparkConf + sysProps.put("spark.yarn.keytab", args.keytab) + sysProps.put("spark.yarn.principal", args.principal) + + UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/51fc152b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index f45747a..436f2e5 100644 ---
spark git commit: [SPARK-11044][SQL] Parquet writer version fixed as version1
Repository: spark Updated Branches: refs/heads/branch-1.6 1887fa228 -> f14fb291d [SPARK-11044][SQL] Parquet writer version fixed as version1 https://issues.apache.org/jira/browse/SPARK-11044 Spark writes a parquet file only with writer version1 ignoring the writer version given by user. So, in this PR, it keeps the writer version if given or sets version1 as default. Author: hyukjinkwonAuthor: HyukjinKwon Closes #9060 from HyukjinKwon/SPARK-11044. (cherry picked from commit 7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618) Signed-off-by: Cheng Lian Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f14fb291 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f14fb291 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f14fb291 Branch: refs/heads/branch-1.6 Commit: f14fb291d822720d8b578db0bdb656fb6c9ce590 Parents: 1887fa2 Author: hyukjinkwon Authored: Mon Nov 16 21:30:10 2015 +0800 Committer: Cheng Lian Committed: Tue Nov 17 03:09:33 2015 +0800 -- .../parquet/CatalystWriteSupport.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala| 34 2 files changed, 35 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f14fb291/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala index 483363d..6862dea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala @@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport { def setSchema(schema: StructType, configuration: Configuration): Unit = { schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) configuration.set(SPARK_ROW_SCHEMA, schema.json) -configuration.set( +configuration.setIfUnset( ParquetOutputFormat.WRITER_VERSION, ParquetProperties.WriterVersion.PARQUET_1_0.toString) } http://git-wip-us.apache.org/repos/asf/spark/blob/f14fb291/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 78df363..2aa5dca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.Collections +import org.apache.parquet.column.{Encoding, ParquetProperties} + import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } } + test("SPARK-11044 Parquet writer version fixed as version1 ") { +// For dictionary encoding, Parquet changes the encoding types according to its writer +// version. So, this test checks one of the encoding types in order to ensure that +// the file is written with writer version2. +withTempPath { dir => + val clonedConf = new Configuration(hadoopConfiguration) + try { +// Write a Parquet file with writer version2. +hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_2_0.toString) + +// By default, dictionary encoding is enabled from Parquet 1.2.0 but +// it is enabled just in case. +hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, true) +val path = s"${dir.getCanonicalPath}/part-r-0.parquet" +sqlContext.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.mode("overwrite").parquet(path) + +val blockMetadata = readFooter(new Path(path), hadoopConfiguration).getBlocks.asScala.head +val columnChunkMetadata = blockMetadata.getColumns.asScala.head + +// If the file is written with version2, this should include +// Encoding.RLE_DICTIONARY type. For version1, it is
spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`
Repository: spark Updated Branches: refs/heads/branch-1.6 38fe092ff -> fbe65c592 [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple` These 2 are very similar, we can consolidate them into one. Also add tests for it and fix a bug. Author: Wenchen FanCloses #9729 from cloud-fan/tuple. (cherry picked from commit b1a9662623951079e80bd7498e064c4cae4977e9) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbe65c59 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbe65c59 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbe65c59 Branch: refs/heads/branch-1.6 Commit: fbe65c5924d2f5f4789bf54a1da0a7b6bbf1eb42 Parents: 38fe092 Author: Wenchen Fan Authored: Mon Nov 16 12:45:34 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 12:46:26 2015 -0800 -- .../scala/org/apache/spark/sql/Encoder.scala| 95 - .../catalyst/encoders/ExpressionEncoder.scala | 104 ++- .../catalyst/encoders/ProductEncoderSuite.scala | 29 ++ 3 files changed, 108 insertions(+), 120 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fbe65c59/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 5f619d6..c8b017e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql import scala.reflect.ClassTag -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ObjectType, StructField, StructType} -import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor} +import org.apache.spark.sql.types.StructType /** * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. @@ -49,83 +47,34 @@ object Encoders { def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true) def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true) - def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = { -tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2)]] + def tuple[T1, T2]( + e1: Encoder[T1], + e2: Encoder[T2]): Encoder[(T1, T2)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2)) } def tuple[T1, T2, T3]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = { -tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3]): Encoder[(T1, T2, T3)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3)) } def tuple[T1, T2, T3, T4]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { -tuple(Seq(enc1, enc2, enc3, enc4).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3], + e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4)) } def tuple[T1, T2, T3, T4, T5]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4], - enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { -tuple(Seq(enc1, enc2, enc3, enc4, enc5).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]] - } - - private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { -assert(encoders.length > 1) -// make sure all encoders are resolved, i.e. `Attribute` has been resolved to `BoundReference`. - assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty)) - -val schema = StructType(encoders.zipWithIndex.map { - case (e, i) => StructField(s"_${i + 1}", if (e.flat) e.schema.head.dataType else e.schema) -}) - -val cls = Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}") - -val extractExpressions = encoders.map { - case e if e.flat =>
spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable
Repository: spark Updated Branches: refs/heads/branch-1.6 fbe65c592 -> 90d71bff0 [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable â¦ishable Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply Author: Zee ChenCloses #9679 from zeocio/spark-11390. (cherry picked from commit 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d71bff Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d71bff Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d71bff Branch: refs/heads/branch-1.6 Commit: 90d71bff0c583830aa3fd96b1dd3607f0cb0cbee Parents: fbe65c5 Author: Zee Chen Authored: Mon Nov 16 14:21:28 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 14:21:41 2015 -0800 -- .../org/apache/spark/sql/execution/ExistingRDD.scala | 6 -- .../execution/datasources/DataSourceStrategy.scala| 6 -- .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 8b41d3d..62620ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -106,7 +106,9 @@ private[sql] object PhysicalRDD { def createFromDataSource( output: Seq[Attribute], rdd: RDD[InternalRow], - relation: BaseRelation): PhysicalRDD = { -PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation]) + relation: BaseRelation, + extraInformation: String = ""): PhysicalRDD = { +PhysicalRDD(output, rdd, relation.toString + extraInformation, + relation.isInstanceOf[HadoopFsRelation]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bbbfa7..544d5ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) +val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ") + if (projects.map(_.toAttribute) == projects && projectSet.size == projects.size && filterSet.subsetOf(projectSet)) { @@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) execution.Project( projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } http://git-wip-us.apache.org/repos/asf/spark/blob/90d71bff/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index be53ec3..dfec139 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++
spark git commit: [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable
Repository: spark Updated Branches: refs/heads/master b1a966262 -> 985b38dd2 [SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable â¦ishable Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply Author: Zee ChenCloses #9679 from zeocio/spark-11390. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/985b38dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/985b38dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/985b38dd Branch: refs/heads/master Commit: 985b38dd2fa5d8f1e23f1c420ce6262e7e3ed181 Parents: b1a9662 Author: Zee Chen Authored: Mon Nov 16 14:21:28 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 14:21:28 2015 -0800 -- .../org/apache/spark/sql/execution/ExistingRDD.scala | 6 -- .../execution/datasources/DataSourceStrategy.scala| 6 -- .../org/apache/spark/sql/execution/PlannerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 8b41d3d..62620ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -106,7 +106,9 @@ private[sql] object PhysicalRDD { def createFromDataSource( output: Seq[Attribute], rdd: RDD[InternalRow], - relation: BaseRelation): PhysicalRDD = { -PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation]) + relation: BaseRelation, + extraInformation: String = ""): PhysicalRDD = { +PhysicalRDD(output, rdd, relation.toString + extraInformation, + relation.isInstanceOf[HadoopFsRelation]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bbbfa7..544d5ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) +val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ") + if (projects.map(_.toAttribute) == projects && projectSet.size == projects.size && filterSet.subsetOf(projectSet)) { @@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) filterCondition.map(execution.Filter(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.PhysicalRDD.createFromDataSource( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), -relation.relation) +relation.relation, pushedFiltersString) execution.Project( projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)) } http://git-wip-us.apache.org/repos/asf/spark/blob/985b38dd/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index be53ec3..dfec139 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext { } }
spark git commit: Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage"
Repository: spark Updated Branches: refs/heads/branch-1.6 90d71bff0 -> 64439f7d6 Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage" This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64439f7d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64439f7d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64439f7d Branch: refs/heads/branch-1.6 Commit: 64439f7d61edfbaa5513ddcf3f9dc86fa905aef2 Parents: 90d71bf Author: Davies LiuAuthored: Mon Nov 16 14:50:38 2015 -0800 Committer: Davies Liu Committed: Mon Nov 16 14:51:25 2015 -0800 -- core/pom.xml| 4 ++ .../org/apache/spark/scheduler/MapStatus.scala | 13 +++--- .../spark/serializer/KryoSerializer.scala | 10 +++- .../apache/spark/util/collection/BitSet.scala | 28 ++- .../spark/serializer/KryoSerializerSuite.scala | 6 +++ .../spark/util/collection/BitSetSuite.scala | 49 pom.xml | 5 ++ 7 files changed, 33 insertions(+), 82 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 7e1205a..37e3f16 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -178,6 +178,10 @@ lz4 + org.roaringbitmap + RoaringBitmap + + commons-net commons-net http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 180c8d1..1efce12 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import org.roaringbitmap.RoaringBitmap + import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.collection.BitSet import org.apache.spark.util.Utils /** @@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus( private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, -private[this] var emptyBlocks: BitSet, +private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long) extends MapStatus with Externalizable { @@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { -if (emptyBlocks.get(reduceId)) { +if (emptyBlocks.contains(reduceId)) { 0 } else { avgSize @@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) -emptyBlocks = new BitSet +emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() } @@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus { // From a compression standpoint, it shouldn't matter whether we track empty or non-empty // blocks. From a performance standpoint, we benefit from tracking empty blocks because // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. +val emptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length -val emptyBlocks = new BitSet(totalNumBlocks) while (i < totalNumBlocks) { var size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 totalSize += size } else { -emptyBlocks.set(i) +emptyBlocks.add(i) } i += 1 } http://git-wip-us.apache.org/repos/asf/spark/blob/64439f7d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index bc51d4f..c5195c1 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input =>
spark git commit: Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage"
Repository: spark Updated Branches: refs/heads/master 985b38dd2 -> 3c025087b Revert "[SPARK-11271][SPARK-11016][CORE] Use Spark BitSet instead of RoaringBitmap to reduce memory usage" This reverts commit e209fa271ae57dc8849f8b1241bf1ea7d6d3d62c. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c025087 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c025087 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c025087 Branch: refs/heads/master Commit: 3c025087b58f475a9bcb5c8f4b2b2df804915b2b Parents: 985b38d Author: Davies LiuAuthored: Mon Nov 16 14:50:38 2015 -0800 Committer: Davies Liu Committed: Mon Nov 16 14:50:38 2015 -0800 -- core/pom.xml| 4 ++ .../org/apache/spark/scheduler/MapStatus.scala | 13 +++--- .../spark/serializer/KryoSerializer.scala | 10 +++- .../apache/spark/util/collection/BitSet.scala | 28 ++- .../spark/serializer/KryoSerializerSuite.scala | 6 +++ .../spark/util/collection/BitSetSuite.scala | 49 pom.xml | 5 ++ 7 files changed, 33 insertions(+), 82 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 7e1205a..37e3f16 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -178,6 +178,10 @@ lz4 + org.roaringbitmap + RoaringBitmap + + commons-net commons-net http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 180c8d1..1efce12 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} +import org.roaringbitmap.RoaringBitmap + import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.collection.BitSet import org.apache.spark.util.Utils /** @@ -132,7 +133,7 @@ private[spark] class CompressedMapStatus( private[spark] class HighlyCompressedMapStatus private ( private[this] var loc: BlockManagerId, private[this] var numNonEmptyBlocks: Int, -private[this] var emptyBlocks: BitSet, +private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long) extends MapStatus with Externalizable { @@ -145,7 +146,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def location: BlockManagerId = loc override def getSizeForBlock(reduceId: Int): Long = { -if (emptyBlocks.get(reduceId)) { +if (emptyBlocks.contains(reduceId)) { 0 } else { avgSize @@ -160,7 +161,7 @@ private[spark] class HighlyCompressedMapStatus private ( override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) -emptyBlocks = new BitSet +emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() } @@ -176,15 +177,15 @@ private[spark] object HighlyCompressedMapStatus { // From a compression standpoint, it shouldn't matter whether we track empty or non-empty // blocks. From a performance standpoint, we benefit from tracking empty blocks because // we expect that there will be far fewer of them, so we will perform fewer bitmap insertions. +val emptyBlocks = new RoaringBitmap() val totalNumBlocks = uncompressedSizes.length -val emptyBlocks = new BitSet(totalNumBlocks) while (i < totalNumBlocks) { var size = uncompressedSizes(i) if (size > 0) { numNonEmptyBlocks += 1 totalSize += size } else { -emptyBlocks.set(i) +emptyBlocks.add(i) } i += 1 } http://git-wip-us.apache.org/repos/asf/spark/blob/3c025087/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala -- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index bc51d4f..c5195c1 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -30,6 +30,7 @@ import com.esotericsoftware.kryo.io.{Input => KryoInput,
spark git commit: [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default
Repository: spark Updated Branches: refs/heads/master b0c3fd34e -> de5e531d3 [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period. I've also added some tests to make sure the default configurations are correct regarding recent additions: - batching on by default - closeFileAfterWrite off by default - parallelRecovery off by default Author: Burak YavuzCloses #9695 from brkyvz/enable-batch-wal. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de5e531d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de5e531d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de5e531d Branch: refs/heads/master Commit: de5e531d337075fd849437e88846873bca8685e6 Parents: b0c3fd3 Author: Burak Yavuz Authored: Mon Nov 16 11:21:17 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 11:21:17 2015 -0800 -- .../streaming/util/WriteAheadLogUtils.scala | 2 +- .../spark/streaming/JavaWriteAheadLogSuite.java | 1 + .../streaming/ReceivedBlockTrackerSuite.scala | 9 ++-- .../streaming/util/WriteAheadLogSuite.scala | 24 +++- .../util/WriteAheadLogUtilsSuite.scala | 19 +--- 5 files changed, 48 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala index 731a369..7f9e2c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { } def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = { -isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false) +isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java -- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 175b8a4..09b5f8e 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { public void testCustomWAL() { SparkConf conf = new SparkConf(); conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName()); +conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false"); WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; http://git-wip-us.apache.org/repos/asf/spark/blob/de5e531d/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 7db17ab..081f5a1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite : Seq[ReceivedBlockTrackerLogEvent] = { logFiles.flatMap { file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq -}.map { byteBuffer => - Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) +}.flatMap { byteBuffer => + val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { + Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap) + } else { +Array(byteBuffer) + } + validBuffer.map(b =>
spark git commit: [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default
Repository: spark Updated Branches: refs/heads/branch-1.6 f14fb291d -> 38673d7e6 [SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period. I've also added some tests to make sure the default configurations are correct regarding recent additions: - batching on by default - closeFileAfterWrite off by default - parallelRecovery off by default Author: Burak YavuzCloses #9695 from brkyvz/enable-batch-wal. (cherry picked from commit de5e531d337075fd849437e88846873bca8685e6) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38673d7e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38673d7e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38673d7e Branch: refs/heads/branch-1.6 Commit: 38673d7e6358622f240d7331b061cadb96f8409f Parents: f14fb29 Author: Burak Yavuz Authored: Mon Nov 16 11:21:17 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 11:21:27 2015 -0800 -- .../streaming/util/WriteAheadLogUtils.scala | 2 +- .../spark/streaming/JavaWriteAheadLogSuite.java | 1 + .../streaming/ReceivedBlockTrackerSuite.scala | 9 ++-- .../streaming/util/WriteAheadLogSuite.scala | 24 +++- .../util/WriteAheadLogUtilsSuite.scala | 19 +--- 5 files changed, 48 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala index 731a369..7f9e2c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala @@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging { } def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = { -isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false) +isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java -- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java index 175b8a4..09b5f8e 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java @@ -108,6 +108,7 @@ public class JavaWriteAheadLogSuite extends WriteAheadLog { public void testCustomWAL() { SparkConf conf = new SparkConf(); conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName()); +conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false"); WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null); String data1 = "data1"; http://git-wip-us.apache.org/repos/asf/spark/blob/38673d7e/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 7db17ab..081f5a1 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite : Seq[ReceivedBlockTrackerLogEvent] = { logFiles.flatMap { file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq -}.map { byteBuffer => - Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array) +}.flatMap { byteBuffer => + val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) { +
spark git commit: [SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue
Repository: spark Updated Branches: refs/heads/master ace0db471 -> 24477d270 [SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue Currently if dynamic allocation is enabled, explicitly killing executor will not get response, so the executor metadata is wrong in driver side. Which will make dynamic allocation on Yarn fail to work. The problem is `disableExecutor` returns false for pending killing executors when `onDisconnect` is detected, so no further implementation is done. One solution is to bypass these explicitly killed executors to use `super.onDisconnect` to remove executor. This is simple. Another solution is still querying the loss reason for these explicitly kill executors. Since executor may get killed and informed in the same AM-RM communication, so current way of adding pending loss reason request is not worked (container complete is already processed), here we should store this loss reason for later query. Here this PR chooses solution 2. Please help to review. vanzin I think this part is changed by you previously, would you please help to review? Thanks a lot. Author: jerryshaoCloses #9684 from jerryshao/SPARK-11718. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24477d27 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24477d27 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24477d27 Branch: refs/heads/master Commit: 24477d2705bcf2a851acc241deb8376c5450dc73 Parents: ace0db4 Author: jerryshao Authored: Mon Nov 16 11:43:18 2015 -0800 Committer: Marcelo Vanzin Committed: Mon Nov 16 11:43:18 2015 -0800 -- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../cluster/CoarseGrainedSchedulerBackend.scala | 6 ++-- .../spark/deploy/yarn/YarnAllocator.scala | 30 3 files changed, 29 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 43d7d80..5f13669 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl( // If the host mapping still exists, it means we don't know the loss reason for the // executor. So call removeExecutor() to update tasks running on that executor when // the real loss reason is finally known. + logError(s"Actual reason for lost executor $executorId: ${reason.message}") removeExecutor(executorId, reason) case None => http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f71d98f..3373caf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -269,7 +269,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. * - * @return Whether executor was alive. + * @return Whether executor should be disabled */ protected def disableExecutor(executorId: String): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { @@ -277,7 +277,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorsPendingLossReason += executorId true } else { - false + // Returns true for explicitly killed executors, we also need to get pending loss reasons; + // For others return false. + executorsPendingToRemove.contains(executorId) } } http://git-wip-us.apache.org/repos/asf/spark/blob/24477d27/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala -- diff --git
spark git commit: [SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue
Repository: spark Updated Branches: refs/heads/branch-1.6 c83177d30 -> 38fe092ff [SPARK-11718][YARN][CORE] Fix explicitly killed executor dies silently issue Currently if dynamic allocation is enabled, explicitly killing executor will not get response, so the executor metadata is wrong in driver side. Which will make dynamic allocation on Yarn fail to work. The problem is `disableExecutor` returns false for pending killing executors when `onDisconnect` is detected, so no further implementation is done. One solution is to bypass these explicitly killed executors to use `super.onDisconnect` to remove executor. This is simple. Another solution is still querying the loss reason for these explicitly kill executors. Since executor may get killed and informed in the same AM-RM communication, so current way of adding pending loss reason request is not worked (container complete is already processed), here we should store this loss reason for later query. Here this PR chooses solution 2. Please help to review. vanzin I think this part is changed by you previously, would you please help to review? Thanks a lot. Author: jerryshaoCloses #9684 from jerryshao/SPARK-11718. (cherry picked from commit 24477d2705bcf2a851acc241deb8376c5450dc73) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38fe092f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38fe092f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38fe092f Branch: refs/heads/branch-1.6 Commit: 38fe092ff3a1e86088a22adc934a4b4e269b0a47 Parents: c83177d Author: jerryshao Authored: Mon Nov 16 11:43:18 2015 -0800 Committer: Marcelo Vanzin Committed: Mon Nov 16 11:43:39 2015 -0800 -- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../cluster/CoarseGrainedSchedulerBackend.scala | 6 ++-- .../spark/deploy/yarn/YarnAllocator.scala | 30 3 files changed, 29 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 43d7d80..5f13669 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -473,6 +473,7 @@ private[spark] class TaskSchedulerImpl( // If the host mapping still exists, it means we don't know the loss reason for the // executor. So call removeExecutor() to update tasks running on that executor when // the real loss reason is finally known. + logError(s"Actual reason for lost executor $executorId: ${reason.message}") removeExecutor(executorId, reason) case None => http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f71d98f..3373caf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -269,7 +269,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Stop making resource offers for the given executor. The executor is marked as lost with * the loss reason still pending. * - * @return Whether executor was alive. + * @return Whether executor should be disabled */ protected def disableExecutor(executorId: String): Boolean = { val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized { @@ -277,7 +277,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorsPendingLossReason += executorId true } else { - false + // Returns true for explicitly killed executors, we also need to get pending loss reasons; + // For others return false. + executorsPendingToRemove.contains(executorId) } } http://git-wip-us.apache.org/repos/asf/spark/blob/38fe092f/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
spark git commit: [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple`
Repository: spark Updated Branches: refs/heads/master 24477d270 -> b1a966262 [SPARK-11754][SQL] consolidate `ExpressionEncoder.tuple` and `Encoders.tuple` These 2 are very similar, we can consolidate them into one. Also add tests for it and fix a bug. Author: Wenchen FanCloses #9729 from cloud-fan/tuple. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1a96626 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1a96626 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1a96626 Branch: refs/heads/master Commit: b1a9662623951079e80bd7498e064c4cae4977e9 Parents: 24477d2 Author: Wenchen Fan Authored: Mon Nov 16 12:45:34 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 12:45:34 2015 -0800 -- .../scala/org/apache/spark/sql/Encoder.scala| 95 - .../catalyst/encoders/ExpressionEncoder.scala | 104 ++- .../catalyst/encoders/ProductEncoderSuite.scala | 29 ++ 3 files changed, 108 insertions(+), 120 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b1a96626/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala index 5f619d6..c8b017e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala @@ -19,10 +19,8 @@ package org.apache.spark.sql import scala.reflect.ClassTag -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{ObjectType, StructField, StructType} -import org.apache.spark.util.Utils +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, encoderFor} +import org.apache.spark.sql.types.StructType /** * Used to convert a JVM object of type `T` to and from the internal Spark SQL representation. @@ -49,83 +47,34 @@ object Encoders { def DOUBLE: Encoder[java.lang.Double] = ExpressionEncoder(flat = true) def STRING: Encoder[java.lang.String] = ExpressionEncoder(flat = true) - def tuple[T1, T2](enc1: Encoder[T1], enc2: Encoder[T2]): Encoder[(T1, T2)] = { -tuple(Seq(enc1, enc2).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2)]] + def tuple[T1, T2]( + e1: Encoder[T1], + e2: Encoder[T2]): Encoder[(T1, T2)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2)) } def tuple[T1, T2, T3]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3]): Encoder[(T1, T2, T3)] = { -tuple(Seq(enc1, enc2, enc3).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3]): Encoder[(T1, T2, T3)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3)) } def tuple[T1, T2, T3, T4]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { -tuple(Seq(enc1, enc2, enc3, enc4).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4)]] + e1: Encoder[T1], + e2: Encoder[T2], + e3: Encoder[T3], + e4: Encoder[T4]): Encoder[(T1, T2, T3, T4)] = { +ExpressionEncoder.tuple(encoderFor(e1), encoderFor(e2), encoderFor(e3), encoderFor(e4)) } def tuple[T1, T2, T3, T4, T5]( - enc1: Encoder[T1], - enc2: Encoder[T2], - enc3: Encoder[T3], - enc4: Encoder[T4], - enc5: Encoder[T5]): Encoder[(T1, T2, T3, T4, T5)] = { -tuple(Seq(enc1, enc2, enc3, enc4, enc5).map(_.asInstanceOf[ExpressionEncoder[_]])) - .asInstanceOf[ExpressionEncoder[(T1, T2, T3, T4, T5)]] - } - - private def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { -assert(encoders.length > 1) -// make sure all encoders are resolved, i.e. `Attribute` has been resolved to `BoundReference`. - assert(encoders.forall(_.fromRowExpression.find(_.isInstanceOf[Attribute]).isEmpty)) - -val schema = StructType(encoders.zipWithIndex.map { - case (e, i) => StructField(s"_${i + 1}", if (e.flat) e.schema.head.dataType else e.schema) -}) - -val cls = Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}") - -val extractExpressions = encoders.map { - case e if e.flat => e.toRowExpressions.head - case other => CreateStruct(other.toRowExpressions) -}.zipWithIndex.map { case (expr, index) => -
spark git commit: [SPARK-6328][PYTHON] Python API for StreamingListener
Repository: spark Updated Branches: refs/heads/branch-1.6 38673d7e6 -> c83177d30 [SPARK-6328][PYTHON] Python API for StreamingListener Author: Daniel JalovaCloses #9186 from djalova/SPARK-6328. (cherry picked from commit ace0db47141ffd457c2091751038fc291f6d5a8b) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c83177d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c83177d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c83177d3 Branch: refs/heads/branch-1.6 Commit: c83177d3045afccf07465c43caac74d980673f42 Parents: 38673d7 Author: Daniel Jalova Authored: Mon Nov 16 11:29:27 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 11:29:36 2015 -0800 -- python/pyspark/streaming/__init__.py| 3 +- python/pyspark/streaming/context.py | 8 ++ python/pyspark/streaming/listener.py| 75 +++ python/pyspark/streaming/tests.py | 126 ++- .../api/java/JavaStreamingListener.scala| 76 +++ 5 files changed, 286 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c83177d3/python/pyspark/streaming/__init__.py -- diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py index d2644a1..66e8f8e 100644 --- a/python/pyspark/streaming/__init__.py +++ b/python/pyspark/streaming/__init__.py @@ -17,5 +17,6 @@ from pyspark.streaming.context import StreamingContext from pyspark.streaming.dstream import DStream +from pyspark.streaming.listener import StreamingListener -__all__ = ['StreamingContext', 'DStream'] +__all__ = ['StreamingContext', 'DStream', 'StreamingListener'] http://git-wip-us.apache.org/repos/asf/spark/blob/c83177d3/python/pyspark/streaming/context.py -- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 8be56c9..1388b6d 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -363,3 +363,11 @@ class StreamingContext(object): first = dstreams[0] jrest = [d._jdstream for d in dstreams[1:]] return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) + +def addStreamingListener(self, streamingListener): +""" +Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for +receiving system events related to streaming. +""" +self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper( +self._jvm.PythonStreamingListenerWrapper(streamingListener))) http://git-wip-us.apache.org/repos/asf/spark/blob/c83177d3/python/pyspark/streaming/listener.py -- diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py new file mode 100644 index 000..b830797 --- /dev/null +++ b/python/pyspark/streaming/listener.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["StreamingListener"] + + +class StreamingListener(object): + +def __init__(self): +pass + +def onReceiverStarted(self, receiverStarted): +""" +Called when a receiver has been started +""" +pass + +def onReceiverError(self, receiverError): +""" +Called when a receiver has reported an error +""" +pass + +def onReceiverStopped(self, receiverStopped): +""" +Called when a receiver has been stopped +""" +pass + +def onBatchSubmitted(self, batchSubmitted): +""" +Called when a batch of jobs has been submitted for processing. +""" +pass + +def onBatchStarted(self, batchStarted): +"""
spark git commit: [SPARK-6328][PYTHON] Python API for StreamingListener
Repository: spark Updated Branches: refs/heads/master de5e531d3 -> ace0db471 [SPARK-6328][PYTHON] Python API for StreamingListener Author: Daniel JalovaCloses #9186 from djalova/SPARK-6328. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ace0db47 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ace0db47 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ace0db47 Branch: refs/heads/master Commit: ace0db47141ffd457c2091751038fc291f6d5a8b Parents: de5e531 Author: Daniel Jalova Authored: Mon Nov 16 11:29:27 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 11:29:27 2015 -0800 -- python/pyspark/streaming/__init__.py| 3 +- python/pyspark/streaming/context.py | 8 ++ python/pyspark/streaming/listener.py| 75 +++ python/pyspark/streaming/tests.py | 126 ++- .../api/java/JavaStreamingListener.scala| 76 +++ 5 files changed, 286 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/__init__.py -- diff --git a/python/pyspark/streaming/__init__.py b/python/pyspark/streaming/__init__.py index d2644a1..66e8f8e 100644 --- a/python/pyspark/streaming/__init__.py +++ b/python/pyspark/streaming/__init__.py @@ -17,5 +17,6 @@ from pyspark.streaming.context import StreamingContext from pyspark.streaming.dstream import DStream +from pyspark.streaming.listener import StreamingListener -__all__ = ['StreamingContext', 'DStream'] +__all__ = ['StreamingContext', 'DStream', 'StreamingListener'] http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/context.py -- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 8be56c9..1388b6d 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -363,3 +363,11 @@ class StreamingContext(object): first = dstreams[0] jrest = [d._jdstream for d in dstreams[1:]] return DStream(self._jssc.union(first._jdstream, jrest), self, first._jrdd_deserializer) + +def addStreamingListener(self, streamingListener): +""" +Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for +receiving system events related to streaming. +""" +self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper( +self._jvm.PythonStreamingListenerWrapper(streamingListener))) http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/listener.py -- diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py new file mode 100644 index 000..b830797 --- /dev/null +++ b/python/pyspark/streaming/listener.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = ["StreamingListener"] + + +class StreamingListener(object): + +def __init__(self): +pass + +def onReceiverStarted(self, receiverStarted): +""" +Called when a receiver has been started +""" +pass + +def onReceiverError(self, receiverError): +""" +Called when a receiver has reported an error +""" +pass + +def onReceiverStopped(self, receiverStopped): +""" +Called when a receiver has been stopped +""" +pass + +def onBatchSubmitted(self, batchSubmitted): +""" +Called when a batch of jobs has been submitted for processing. +""" +pass + +def onBatchStarted(self, batchStarted): +""" +Called when processing of a batch of jobs has started. +""" +pass + +def onBatchCompleted(self,
spark git commit: [SPARK-11625][SQL] add java test for typed aggregate
Repository: spark Updated Branches: refs/heads/master 75ee12f09 -> fd14936be [SPARK-11625][SQL] add java test for typed aggregate Author: Wenchen FanCloses #9591 from cloud-fan/agg-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd14936b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd14936b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd14936b Branch: refs/heads/master Commit: fd14936be7beff543dbbcf270f2f9749f7a803c4 Parents: 75ee12f Author: Wenchen Fan Authored: Mon Nov 16 15:32:49 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 15:32:49 2015 -0800 -- .../spark/api/java/function/Function.java | 2 +- .../org/apache/spark/sql/GroupedDataset.scala | 34 ++-- .../spark/sql/expressions/Aggregator.scala | 2 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 56 .../spark/sql/DatasetAggregatorSuite.scala | 7 ++- 5 files changed, 92 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fd14936b/core/src/main/java/org/apache/spark/api/java/function/Function.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java index d00551b..b9d9777 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java @@ -25,5 +25,5 @@ import java.io.Serializable; * when mapping RDDs of other types. */ public interface Function extends Serializable { - public R call(T1 v1) throws Exception; + R call(T1 v1) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/fd14936b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala index ebcf4c8..467cd42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala @@ -145,9 +145,37 @@ class GroupedDataset[K, T] private[sql]( reduce(f.call _) } - // To ensure valid overloading. - protected def agg(expr: Column, exprs: Column*): DataFrame = -groupedData.agg(expr, exprs: _*) + /** + * Compute aggregates by specifying a series of aggregate columns, and return a [[DataFrame]]. + * We can call `as[T : Encoder]` to turn the returned [[DataFrame]] to [[Dataset]] again. + * + * The available aggregate methods are defined in [[org.apache.spark.sql.functions]]. + * + * {{{ + * // Selects the age of the oldest employee and the aggregate expense for each department + * + * // Scala: + * import org.apache.spark.sql.functions._ + * df.groupBy("department").agg(max("age"), sum("expense")) + * + * // Java: + * import static org.apache.spark.sql.functions.*; + * df.groupBy("department").agg(max("age"), sum("expense")); + * }}} + * + * We can also use `Aggregator.toColumn` to pass in typed aggregate functions. + * + * @since 1.6.0 + */ + @scala.annotation.varargs + def agg(expr: Column, exprs: Column*): DataFrame = +groupedData.agg(withEncoder(expr), exprs.map(withEncoder): _*) + + private def withEncoder(c: Column): Column = c match { +case tc: TypedColumn[_, _] => + tc.withInputType(resolvedTEncoder.bind(dataAttributes), dataAttributes) +case _ => c + } /** * Internal helper function for building typed aggregations that return tuples. For simplicity http://git-wip-us.apache.org/repos/asf/spark/blob/fd14936b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index 360c9a5..72610e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn} * @tparam B The type of the intermediate value of the reduction. * @tparam C The type of the final result. */ -abstract class Aggregator[-A, B, C] { +abstract class Aggregator[-A, B, C] extends Serializable { /** A zero value for this aggregation. Should satisfy the property that any b + zero = b */
spark git commit: [SPARK-11742][STREAMING] Add the failure info to the batch lists
Repository: spark Updated Branches: refs/heads/master 3c025087b -> bcea0bfda [SPARK-11742][STREAMING] Add the failure info to the batch lists https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png;> Author: Shixiong ZhuCloses #9711 from zsxwing/failure-info. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcea0bfd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcea0bfd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcea0bfd Branch: refs/heads/master Commit: bcea0bfda66a30ee86790b048de5cb47b4d0b32f Parents: 3c02508 Author: Shixiong Zhu Authored: Mon Nov 16 15:06:06 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 15:06:06 2015 -0800 -- .../spark/streaming/ui/AllBatchesTable.scala| 61 ++-- .../apache/spark/streaming/ui/BatchPage.scala | 49 ++-- .../org/apache/spark/streaming/ui/UIUtils.scala | 60 +++ 3 files changed, 120 insertions(+), 50 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bcea0bfd/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 125cafd..d339723 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")} } + /** + * Return the first failure reason if finding in the batches. + */ + protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { +batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption + } + + protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { +val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption +firstFailureReason.map { failureReason => + val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) + UIUtils.failureReasonCell( +failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false) +}.getOrElse(-) + } + protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval) @@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable( waitingBatches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { + private val firstFailureReason = getFirstFailureReason(runningBatches) + override protected def columns: Seq[Node] = super.columns ++ { Output Ops: Succeeded/Total - Status + Status ++ { + if (firstFailureReason.nonEmpty) { +Error + } else { +Nil + } +} } override protected def renderRows: Seq[Node] = { @@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable( } private def runningBatchRow(batch: BatchUIData): Seq[Node] = { -baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing +baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing ++ { + if (firstFailureReason.nonEmpty) { +getFirstFailureTableCell(batch) + } else { +Nil + } +} } private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { -baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued +baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued++ { + if (firstFailureReason.nonEmpty) { +// Waiting batches have not run yet, so must have no failure reasons. +- + } else { +Nil + } +} } } private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("completed-batches-table", batchInterval) { + private val firstFailureReason = getFirstFailureReason(batches) + override protected def columns: Seq[Node] = super.columns ++ { Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} - Output Ops: Succeeded/Total + Output Ops: Succeeded/Total ++ { + if (firstFailureReason.nonEmpty) { +Error + } else { +Nil + } +} } override protected def renderRows: Seq[Node] = { @@ -138,6 +183,12 @@ private[ui]
spark git commit: [SPARK-11742][STREAMING] Add the failure info to the batch lists
Repository: spark Updated Branches: refs/heads/branch-1.6 64439f7d6 -> 3bd72eafc [SPARK-11742][STREAMING] Add the failure info to the batch lists https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png;> Author: Shixiong ZhuCloses #9711 from zsxwing/failure-info. (cherry picked from commit bcea0bfda66a30ee86790b048de5cb47b4d0b32f) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bd72eaf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bd72eaf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bd72eaf Branch: refs/heads/branch-1.6 Commit: 3bd72eafc5b7204d49fc1a3d587337fae88d8ae8 Parents: 64439f7 Author: Shixiong Zhu Authored: Mon Nov 16 15:06:06 2015 -0800 Committer: Tathagata Das Committed: Mon Nov 16 15:06:20 2015 -0800 -- .../spark/streaming/ui/AllBatchesTable.scala| 61 ++-- .../apache/spark/streaming/ui/BatchPage.scala | 49 ++-- .../org/apache/spark/streaming/ui/UIUtils.scala | 60 +++ 3 files changed, 120 insertions(+), 50 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3bd72eaf/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index 125cafd..d339723 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")} } + /** + * Return the first failure reason if finding in the batches. + */ + protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = { +batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption + } + + protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = { +val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption +firstFailureReason.map { failureReason => + val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason) + UIUtils.failureReasonCell( +failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false) +}.getOrElse(-) + } + protected def baseRow(batch: BatchUIData): Seq[Node] = { val batchTime = batch.batchTime.milliseconds val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval) @@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable( waitingBatches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) { + private val firstFailureReason = getFirstFailureReason(runningBatches) + override protected def columns: Seq[Node] = super.columns ++ { Output Ops: Succeeded/Total - Status + Status ++ { + if (firstFailureReason.nonEmpty) { +Error + } else { +Nil + } +} } override protected def renderRows: Seq[Node] = { @@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable( } private def runningBatchRow(batch: BatchUIData): Seq[Node] = { -baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing +baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing ++ { + if (firstFailureReason.nonEmpty) { +getFirstFailureTableCell(batch) + } else { +Nil + } +} } private def waitingBatchRow(batch: BatchUIData): Seq[Node] = { -baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued +baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued++ { + if (firstFailureReason.nonEmpty) { +// Waiting batches have not run yet, so must have no failure reasons. +- + } else { +Nil + } +} } } private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("completed-batches-table", batchInterval) { + private val firstFailureReason = getFirstFailureReason(batches) + override protected def columns: Seq[Node] = super.columns ++ { Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} - Output Ops: Succeeded/Total + Output Ops: Succeeded/Total ++ { + if (firstFailureReason.nonEmpty) { +Error +
spark git commit: [SPARK-11553][SQL] Primitive Row accessors should not convert null to default value
Repository: spark Updated Branches: refs/heads/branch-1.6 3bd72eafc -> 6c8e0c0ff [SPARK-11553][SQL] Primitive Row accessors should not convert null to default value Invocation of getters for type extending AnyVal returns default value (if field value is null) instead of throwing NPE. Please check comments for SPARK-11553 issue for more details. Author: Bartlomiej AlberskiCloses #9642 from alberskib/bugfix/SPARK-11553. (cherry picked from commit 31296628ac7cd7be71e0edca335dc8604f62bb47) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c8e0c0f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c8e0c0f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c8e0c0f Branch: refs/heads/branch-1.6 Commit: 6c8e0c0ff4fd3acbe0dbe7119920ee8b74d191dd Parents: 3bd72ea Author: Bartlomiej Alberski Authored: Mon Nov 16 15:14:38 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 15:14:48 2015 -0800 -- .../main/scala/org/apache/spark/sql/Row.scala | 32 - .../scala/org/apache/spark/sql/RowTest.scala| 20 +++ .../local/NestedLoopJoinNodeSuite.scala | 36 3 files changed, 65 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6c8e0c0f/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 0f0f200..b14c66c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -191,7 +191,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getBoolean(i: Int): Boolean = getAs[Boolean](i) + def getBoolean(i: Int): Boolean = getAnyValAs[Boolean](i) /** * Returns the value at position i as a primitive byte. @@ -199,7 +199,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getByte(i: Int): Byte = getAs[Byte](i) + def getByte(i: Int): Byte = getAnyValAs[Byte](i) /** * Returns the value at position i as a primitive short. @@ -207,7 +207,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getShort(i: Int): Short = getAs[Short](i) + def getShort(i: Int): Short = getAnyValAs[Short](i) /** * Returns the value at position i as a primitive int. @@ -215,7 +215,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getInt(i: Int): Int = getAs[Int](i) + def getInt(i: Int): Int = getAnyValAs[Int](i) /** * Returns the value at position i as a primitive long. @@ -223,7 +223,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getLong(i: Int): Long = getAs[Long](i) + def getLong(i: Int): Long = getAnyValAs[Long](i) /** * Returns the value at position i as a primitive float. @@ -232,7 +232,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getFloat(i: Int): Float = getAs[Float](i) + def getFloat(i: Int): Float = getAnyValAs[Float](i) /** * Returns the value at position i as a primitive double. @@ -240,13 +240,12 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getDouble(i: Int): Double = getAs[Double](i) + def getDouble(i: Int): Double = getAnyValAs[Double](i) /** * Returns the value at position i as a String object. * * @throws ClassCastException when data type does not match. - * @throws NullPointerException when value is null. */ def getString(i: Int): String = getAs[String](i) @@ -318,6 +317,8 @@ trait Row extends Serializable { /** * Returns the value at position i. + * For primitive types if value is null it returns 'zero value' specific for primitive + * ie. 0 for Int - use isNullAt to ensure that value is not null * * @throws ClassCastException when
spark git commit: [SPARKR][HOTFIX] Disable flaky SparkR package build test
Repository: spark Updated Branches: refs/heads/branch-1.6 4f8c7e18f -> bb044ec22 [SPARKR][HOTFIX] Disable flaky SparkR package build test See https://github.com/apache/spark/pull/9390#issuecomment-157160063 and https://gist.github.com/shivaram/3a2fecce60768a603dac for more information Author: Shivaram VenkataramanCloses #9744 from shivaram/sparkr-package-test-disable. (cherry picked from commit ea6f53e48a911b49dc175ccaac8c80e0a1d97a09) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb044ec2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb044ec2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb044ec2 Branch: refs/heads/branch-1.6 Commit: bb044ec2278e5623e8fd3ae1f3172b5047b83439 Parents: 4f8c7e1 Author: Shivaram Venkataraman Authored: Mon Nov 16 16:57:50 2015 -0800 Committer: Andrew Or Committed: Mon Nov 16 16:58:02 2015 -0800 -- .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bb044ec2/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 42e748e..d494b0c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -369,7 +369,9 @@ class SparkSubmitSuite } } - test("correctly builds R packages included in a jar with --packages") { + // TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds. + // See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log + ignore("correctly builds R packages included in a jar with --packages") { assume(RUtils.isRInstalled, "R isn't installed on this machine.") val main = MavenCoordinate("my.great.lib", "mylib", "0.1") val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARKR][HOTFIX] Disable flaky SparkR package build test
Repository: spark Updated Branches: refs/heads/master fd14936be -> ea6f53e48 [SPARKR][HOTFIX] Disable flaky SparkR package build test See https://github.com/apache/spark/pull/9390#issuecomment-157160063 and https://gist.github.com/shivaram/3a2fecce60768a603dac for more information Author: Shivaram VenkataramanCloses #9744 from shivaram/sparkr-package-test-disable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ea6f53e4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ea6f53e4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ea6f53e4 Branch: refs/heads/master Commit: ea6f53e48a911b49dc175ccaac8c80e0a1d97a09 Parents: fd14936 Author: Shivaram Venkataraman Authored: Mon Nov 16 16:57:50 2015 -0800 Committer: Andrew Or Committed: Mon Nov 16 16:57:50 2015 -0800 -- .../test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala| 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ea6f53e4/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 42e748e..d494b0c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -369,7 +369,9 @@ class SparkSubmitSuite } } - test("correctly builds R packages included in a jar with --packages") { + // TODO(SPARK-9603): Building a package is flaky on Jenkins Maven builds. + // See https://gist.github.com/shivaram/3a2fecce60768a603dac for a error log + ignore("correctly builds R packages included in a jar with --packages") { assume(RUtils.isRInstalled, "R isn't installed on this machine.") val main = MavenCoordinate("my.great.lib", "mylib", "0.1") val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8658][SQL] AttributeReference's equals method compares all the members
Repository: spark Updated Branches: refs/heads/branch-1.6 6c8e0c0ff -> e042780cd [SPARK-8658][SQL] AttributeReference's equals method compares all the members This fix is to change the equals method to check all of the specified fields for equality of AttributeReference. Author: gatorsmileCloses #9216 from gatorsmile/namedExpressEqual. (cherry picked from commit 75ee12f09c2645c1ad682764d512965f641eb5c2) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e042780c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e042780c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e042780c Branch: refs/heads/branch-1.6 Commit: e042780cd95f5bf3988dd73bc4431c3b012c171c Parents: 6c8e0c0 Author: gatorsmile Authored: Mon Nov 16 15:22:12 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 15:22:24 2015 -0800 -- .../sql/catalyst/expressions/namedExpressions.scala | 4 +++- .../sql/catalyst/plans/logical/basicOperators.scala | 10 +- .../sql/catalyst/plans/physical/partitioning.scala | 12 ++-- 3 files changed, 14 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e042780c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index f80bcfc..e3dadda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -194,7 +194,9 @@ case class AttributeReference( def sameRef(other: AttributeReference): Boolean = this.exprId == other.exprId override def equals(other: Any): Boolean = other match { -case ar: AttributeReference => name == ar.name && exprId == ar.exprId && dataType == ar.dataType +case ar: AttributeReference => + name == ar.name && dataType == ar.dataType && nullable == ar.nullable && +metadata == ar.metadata && exprId == ar.exprId && qualifiers == ar.qualifiers case _ => false } http://git-wip-us.apache.org/repos/asf/spark/blob/e042780c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index e2b97b2..45630a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.types._ -import org.apache.spark.util.collection.OpenHashSet +import scala.collection.mutable.ArrayBuffer case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) @@ -244,12 +244,12 @@ private[sql] object Expand { */ private def buildNonSelectExprSet( bitmask: Int, - exprs: Seq[Expression]): OpenHashSet[Expression] = { -val set = new OpenHashSet[Expression](2) + exprs: Seq[Expression]): ArrayBuffer[Expression] = { +val set = new ArrayBuffer[Expression](2) var bit = exprs.length - 1 while (bit >= 0) { - if (((bitmask >> bit) & 1) == 0) set.add(exprs(bit)) + if (((bitmask >> bit) & 1) == 0) set += exprs(bit) bit -= 1 } @@ -279,7 +279,7 @@ private[sql] object Expand { (child.output :+ gid).map(expr => expr transformDown { // TODO this causes a problem when a column is used both for grouping and aggregation. -case x: Expression if nonSelectedGroupExprSet.contains(x) => +case x: Expression if nonSelectedGroupExprSet.exists(_.semanticEquals(x)) => // if the input attribute in the Invalid Grouping Expression set of for this group // replace it with constant null Literal.create(null, expr.dataType)
spark git commit: [SPARK-8658][SQL] AttributeReference's equals method compares all the members
Repository: spark Updated Branches: refs/heads/master 31296628a -> 75ee12f09 [SPARK-8658][SQL] AttributeReference's equals method compares all the members This fix is to change the equals method to check all of the specified fields for equality of AttributeReference. Author: gatorsmileCloses #9216 from gatorsmile/namedExpressEqual. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75ee12f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75ee12f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75ee12f0 Branch: refs/heads/master Commit: 75ee12f09c2645c1ad682764d512965f641eb5c2 Parents: 3129662 Author: gatorsmile Authored: Mon Nov 16 15:22:12 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 15:22:12 2015 -0800 -- .../sql/catalyst/expressions/namedExpressions.scala | 4 +++- .../sql/catalyst/plans/logical/basicOperators.scala | 10 +- .../sql/catalyst/plans/physical/partitioning.scala | 12 ++-- 3 files changed, 14 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/75ee12f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index f80bcfc..e3dadda 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -194,7 +194,9 @@ case class AttributeReference( def sameRef(other: AttributeReference): Boolean = this.exprId == other.exprId override def equals(other: Any): Boolean = other match { -case ar: AttributeReference => name == ar.name && exprId == ar.exprId && dataType == ar.dataType +case ar: AttributeReference => + name == ar.name && dataType == ar.dataType && nullable == ar.nullable && +metadata == ar.metadata && exprId == ar.exprId && qualifiers == ar.qualifiers case _ => false } http://git-wip-us.apache.org/repos/asf/spark/blob/75ee12f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index e2b97b2..45630a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.types._ -import org.apache.spark.util.collection.OpenHashSet +import scala.collection.mutable.ArrayBuffer case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) @@ -244,12 +244,12 @@ private[sql] object Expand { */ private def buildNonSelectExprSet( bitmask: Int, - exprs: Seq[Expression]): OpenHashSet[Expression] = { -val set = new OpenHashSet[Expression](2) + exprs: Seq[Expression]): ArrayBuffer[Expression] = { +val set = new ArrayBuffer[Expression](2) var bit = exprs.length - 1 while (bit >= 0) { - if (((bitmask >> bit) & 1) == 0) set.add(exprs(bit)) + if (((bitmask >> bit) & 1) == 0) set += exprs(bit) bit -= 1 } @@ -279,7 +279,7 @@ private[sql] object Expand { (child.output :+ gid).map(expr => expr transformDown { // TODO this causes a problem when a column is used both for grouping and aggregation. -case x: Expression if nonSelectedGroupExprSet.contains(x) => +case x: Expression if nonSelectedGroupExprSet.exists(_.semanticEquals(x)) => // if the input attribute in the Invalid Grouping Expression set of for this group // replace it with constant null Literal.create(null, expr.dataType) http://git-wip-us.apache.org/repos/asf/spark/blob/75ee12f0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala -- diff --git
spark git commit: [SPARK-11625][SQL] add java test for typed aggregate
Repository: spark Updated Branches: refs/heads/branch-1.6 e042780cd -> 4f8c7e18f [SPARK-11625][SQL] add java test for typed aggregate Author: Wenchen FanCloses #9591 from cloud-fan/agg-test. (cherry picked from commit fd14936be7beff543dbbcf270f2f9749f7a803c4) Signed-off-by: Michael Armbrust Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8c7e18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8c7e18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8c7e18 Branch: refs/heads/branch-1.6 Commit: 4f8c7e18f3103ee1fcf5a79c1d39cf5a81e78c87 Parents: e042780 Author: Wenchen Fan Authored: Mon Nov 16 15:32:49 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 15:35:21 2015 -0800 -- .../spark/api/java/function/Function.java | 2 +- .../org/apache/spark/sql/GroupedDataset.scala | 34 ++-- .../spark/sql/expressions/Aggregator.scala | 2 +- .../org/apache/spark/sql/JavaDatasetSuite.java | 56 .../spark/sql/DatasetAggregatorSuite.scala | 7 ++- 5 files changed, 92 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f8c7e18/core/src/main/java/org/apache/spark/api/java/function/Function.java -- diff --git a/core/src/main/java/org/apache/spark/api/java/function/Function.java b/core/src/main/java/org/apache/spark/api/java/function/Function.java index d00551b..b9d9777 100644 --- a/core/src/main/java/org/apache/spark/api/java/function/Function.java +++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java @@ -25,5 +25,5 @@ import java.io.Serializable; * when mapping RDDs of other types. */ public interface Function extends Serializable { - public R call(T1 v1) throws Exception; + R call(T1 v1) throws Exception; } http://git-wip-us.apache.org/repos/asf/spark/blob/4f8c7e18/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala index ebcf4c8..467cd42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedDataset.scala @@ -145,9 +145,37 @@ class GroupedDataset[K, T] private[sql]( reduce(f.call _) } - // To ensure valid overloading. - protected def agg(expr: Column, exprs: Column*): DataFrame = -groupedData.agg(expr, exprs: _*) + /** + * Compute aggregates by specifying a series of aggregate columns, and return a [[DataFrame]]. + * We can call `as[T : Encoder]` to turn the returned [[DataFrame]] to [[Dataset]] again. + * + * The available aggregate methods are defined in [[org.apache.spark.sql.functions]]. + * + * {{{ + * // Selects the age of the oldest employee and the aggregate expense for each department + * + * // Scala: + * import org.apache.spark.sql.functions._ + * df.groupBy("department").agg(max("age"), sum("expense")) + * + * // Java: + * import static org.apache.spark.sql.functions.*; + * df.groupBy("department").agg(max("age"), sum("expense")); + * }}} + * + * We can also use `Aggregator.toColumn` to pass in typed aggregate functions. + * + * @since 1.6.0 + */ + @scala.annotation.varargs + def agg(expr: Column, exprs: Column*): DataFrame = +groupedData.agg(withEncoder(expr), exprs.map(withEncoder): _*) + + private def withEncoder(c: Column): Column = c match { +case tc: TypedColumn[_, _] => + tc.withInputType(resolvedTEncoder.bind(dataAttributes), dataAttributes) +case _ => c + } /** * Internal helper function for building typed aggregations that return tuples. For simplicity http://git-wip-us.apache.org/repos/asf/spark/blob/4f8c7e18/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala index 360c9a5..72610e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.{Dataset, DataFrame, TypedColumn} * @tparam B The type of the intermediate value of the reduction. * @tparam C The type of the final result. */ -abstract class Aggregator[-A, B, C] { +abstract class
spark git commit: [SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync
Repository: spark Updated Branches: refs/heads/branch-1.6 bb044ec22 -> e4abfe932 [SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync When we call AsyncRDDActions#takeAsync, actually another DAGScheduler#runJob is called from another thread so we cannot get proper callsite infomation. Following screenshots are before this patch applied and after. Before: https://cloud.githubusercontent.com/assets/4736016/10914069/0ffc1306-8294-11e5-8e89-c4fadf58dd12.png;> https://cloud.githubusercontent.com/assets/4736016/10914070/0ffe84ce-8294-11e5-8b2a-69d36276bedb.png;> After: https://cloud.githubusercontent.com/assets/4736016/10914080/1d8cfb7a-8294-11e5-9e09-ede25c2563e8.png;> https://cloud.githubusercontent.com/assets/4736016/10914081/1d934e3a-8294-11e5-8b5e-e3dc37aaced3.png;> Author: Kousuke SarutaCloses #9437 from sarutak/SPARK-11480. (cherry picked from commit 30f3cfda1cce8760f15c0a48a8c47f09a5167fca) Signed-off-by: Andrew Or Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4abfe93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4abfe93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4abfe93 Branch: refs/heads/branch-1.6 Commit: e4abfe9324e3fdebace8b33cc6616f382ad7ac45 Parents: bb044ec Author: Kousuke Saruta Authored: Mon Nov 16 16:59:16 2015 -0800 Committer: Andrew Or Committed: Mon Nov 16 16:59:23 2015 -0800 -- core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e4abfe93/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index ca1eb1f..d5e8536 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -66,6 +66,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] +val callSite = self.context.getCallSite f.run { // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which @@ -73,6 +74,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val results = new ArrayBuffer[T](num) val totalParts = self.partitions.length var partsScanned = 0 + self.context.setCallSite(callSite) while (results.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync
Repository: spark Updated Branches: refs/heads/master ea6f53e48 -> 30f3cfda1 [SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync When we call AsyncRDDActions#takeAsync, actually another DAGScheduler#runJob is called from another thread so we cannot get proper callsite infomation. Following screenshots are before this patch applied and after. Before: https://cloud.githubusercontent.com/assets/4736016/10914069/0ffc1306-8294-11e5-8e89-c4fadf58dd12.png;> https://cloud.githubusercontent.com/assets/4736016/10914070/0ffe84ce-8294-11e5-8b2a-69d36276bedb.png;> After: https://cloud.githubusercontent.com/assets/4736016/10914080/1d8cfb7a-8294-11e5-9e09-ede25c2563e8.png;> https://cloud.githubusercontent.com/assets/4736016/10914081/1d934e3a-8294-11e5-8b5e-e3dc37aaced3.png;> Author: Kousuke SarutaCloses #9437 from sarutak/SPARK-11480. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30f3cfda Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30f3cfda Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30f3cfda Branch: refs/heads/master Commit: 30f3cfda1cce8760f15c0a48a8c47f09a5167fca Parents: ea6f53e Author: Kousuke Saruta Authored: Mon Nov 16 16:59:16 2015 -0800 Committer: Andrew Or Committed: Mon Nov 16 16:59:16 2015 -0800 -- core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/30f3cfda/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index ca1eb1f..d5e8536 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -66,6 +66,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] +val callSite = self.context.getCallSite f.run { // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which @@ -73,6 +74,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val results = new ArrayBuffer[T](num) val totalParts = self.partitions.length var partsScanned = 0 + self.context.setCallSite(callSite) while (results.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11710] Document new memory management model
Repository: spark Updated Branches: refs/heads/master 30f3cfda1 -> 33a0ec937 [SPARK-11710] Document new memory management model Author: Andrew OrCloses #9676 from andrewor14/memory-management-docs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33a0ec93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33a0ec93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33a0ec93 Branch: refs/heads/master Commit: 33a0ec93771ef5c3b388165b07cfab9014918d3b Parents: 30f3cfd Author: Andrew Or Authored: Mon Nov 16 17:00:18 2015 -0800 Committer: Andrew Or Committed: Mon Nov 16 17:00:18 2015 -0800 -- docs/configuration.md | 13 ++- docs/tuning.md| 54 ++ 2 files changed, 44 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/33a0ec93/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index d961f43..c496146 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -722,17 +722,20 @@ Apart from these, the following properties are also available, and may be useful Fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation -in the case of sparse, unusually large records. +in the case of sparse, unusually large records. Leaving this at the default value is +recommended. For more detail, see +this description. spark.memory.storageFraction 0.5 -Tâhe size of the storage region within the space set aside by -sâpark.memory.fraction. This region is not statically reserved, but dynamically -allocated as cache requests come in. âCached data may be evicted only if total storage exceeds -this region. +Amount of storage memory immune to eviction, expressed as a fraction of the size of the +region set aside by sâpark.memory.fraction. The higher this is, the less +working memory may be available to execution and tasks may spill to disk more often. +Leaving this at the default value is recommended. For more detail, see +this description. http://git-wip-us.apache.org/repos/asf/spark/blob/33a0ec93/docs/tuning.md -- diff --git a/docs/tuning.md b/docs/tuning.md index 879340a..a8fe7a4 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -88,9 +88,39 @@ than the "raw" data inside their fields. This is due to several reasons: but also pointers (typically 8 bytes each) to the next object in the list. * Collections of primitive types often store them as "boxed" objects such as `java.lang.Integer`. -This section will discuss how to determine the memory usage of your objects, and how to improve -it -- either by changing your data structures, or by storing data in a serialized format. -We will then cover tuning Spark's cache size and the Java garbage collector. +This section will start with an overview of memory management in Spark, then discuss specific +strategies the user can take to make more efficient use of memory in his/her application. In +particular, we will describe how to determine the memory usage of your objects, and how to +improve it -- either by changing your data structures, or by storing data in a serialized +format. We will then cover tuning Spark's cache size and the Java garbage collector. + +## Memory Management Overview + +Memory usage in Spark largely falls under one of two categories: execution and storage. +Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, +while storage memory refers to that used for caching and propagating internal data across the +cluster. In Spark, execution and storage share a unified region (M). When no execution memory is +used, storage can acquire all the available memory and vice versa. Execution may evict storage +if necessary, but only until total storage memory usage falls under a certain threshold (R). +In other words, `R` describes a subregion within `M` where cached blocks are never evicted. +Storage may not evict execution due to complexities in implementation. + +This design ensures several desirable properties. First, applications that do not use caching +can use the entire space for execution, obviating unnecessary disk spills. Second, applications +that do use caching can reserve a minimum storage space (R) where their data blocks are immune +to
spark git commit: [EXAMPLE][MINOR] Add missing awaitTermination in click stream example
Repository: spark Updated Branches: refs/heads/master 33a0ec937 -> bd10eb81c [EXAMPLE][MINOR] Add missing awaitTermination in click stream example Author: jerryshaoCloses #9730 from jerryshao/clickstream-fix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd10eb81 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd10eb81 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd10eb81 Branch: refs/heads/master Commit: bd10eb81c98e5e9df453f721943a3e82d9f74ae4 Parents: 33a0ec9 Author: jerryshao Authored: Mon Nov 16 17:02:21 2015 -0800 Committer: Andrew Or Committed: Mon Nov 16 17:02:21 2015 -0800 -- .../spark/examples/streaming/clickstream/PageViewStream.scala| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bd10eb81/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala index ec7d39d..4ef2386 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala @@ -18,7 +18,6 @@ // scalastyle:off println package org.apache.spark.examples.streaming.clickstream -import org.apache.spark.SparkContext._ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.examples.streaming.StreamingExamples // scalastyle:off @@ -88,7 +87,7 @@ object PageViewStream { // An external dataset we want to join to this stream val userList = ssc.sparkContext.parallelize( - Map(1 -> "Patrick Wendell", 2->"Reynold Xin", 3->"Matei Zaharia").toSeq) + Map(1 -> "Patrick Wendell", 2 -> "Reynold Xin", 3 -> "Matei Zaharia").toSeq) metric match { case "pageCounts" => pageCounts.print() @@ -106,6 +105,7 @@ object PageViewStream { } ssc.start() +ssc.awaitTermination() } } // scalastyle:on println - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for current_timestamp).
Repository: spark Updated Branches: refs/heads/master 540bf58f1 -> fbad920db [SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for current_timestamp). This patch adds an alias for current_timestamp (now function). Also fixes SPARK-9196 to re-enable the test case for current_timestamp. Author: Reynold XinCloses #9753 from rxin/SPARK-11768. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fbad920d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fbad920d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fbad920d Branch: refs/heads/master Commit: fbad920dbfd6f389dea852cdc159cacb0f4f6997 Parents: 540bf58 Author: Reynold Xin Authored: Mon Nov 16 20:47:46 2015 -0800 Committer: Reynold Xin Committed: Mon Nov 16 20:47:46 2015 -0800 -- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../org/apache/spark/sql/DateFunctionsSuite.scala | 18 -- 2 files changed, 13 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fbad920d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index a8f4d25..f9c04d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -244,6 +244,7 @@ object FunctionRegistry { expression[AddMonths]("add_months"), expression[CurrentDate]("current_date"), expression[CurrentTimestamp]("current_timestamp"), +expression[CurrentTimestamp]("now"), expression[DateDiff]("datediff"), expression[DateAdd]("date_add"), expression[DateFormatClass]("date_format"), http://git-wip-us.apache.org/repos/asf/spark/blob/fbad920d/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 1266d53..241cbd0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -38,15 +38,21 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1) } - // This is a bad test. SPARK-9196 will fix it and re-enable it. - ignore("function current_timestamp") { + test("function current_timestamp and now") { val df1 = Seq((1, 2), (3, 1)).toDF("a", "b") checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1)) + // Execution in one query should return the same value -checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), - Row(true)) -assert(math.abs(sql("""SELECT CURRENT_TIMESTAMP()""").collect().head.getTimestamp( - 0).getTime - System.currentTimeMillis()) < 5000) +checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), Row(true)) + +// Current timestamp should return the current timestamp ... +val before = System.currentTimeMillis +val got = sql("SELECT CURRENT_TIMESTAMP()").collect().head.getTimestamp(0).getTime +val after = System.currentTimeMillis +assert(got >= before && got <= after) + +// Now alias +checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = NOW()"""), Row(true)) } val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for current_timestamp).
Repository: spark Updated Branches: refs/heads/branch-1.6 5a6f40459 -> 07ac8e950 [SPARK-11768][SPARK-9196][SQL] Support now function in SQL (alias for current_timestamp). This patch adds an alias for current_timestamp (now function). Also fixes SPARK-9196 to re-enable the test case for current_timestamp. Author: Reynold XinCloses #9753 from rxin/SPARK-11768. (cherry picked from commit fbad920dbfd6f389dea852cdc159cacb0f4f6997) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07ac8e95 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07ac8e95 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07ac8e95 Branch: refs/heads/branch-1.6 Commit: 07ac8e95070ef1cb22641de9159479294a6cd774 Parents: 5a6f404 Author: Reynold Xin Authored: Mon Nov 16 20:47:46 2015 -0800 Committer: Reynold Xin Committed: Mon Nov 16 20:47:59 2015 -0800 -- .../sql/catalyst/analysis/FunctionRegistry.scala | 1 + .../org/apache/spark/sql/DateFunctionsSuite.scala | 18 -- 2 files changed, 13 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07ac8e95/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index a8f4d25..f9c04d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -244,6 +244,7 @@ object FunctionRegistry { expression[AddMonths]("add_months"), expression[CurrentDate]("current_date"), expression[CurrentTimestamp]("current_timestamp"), +expression[CurrentTimestamp]("now"), expression[DateDiff]("datediff"), expression[DateAdd]("date_add"), expression[DateFormatClass]("date_format"), http://git-wip-us.apache.org/repos/asf/spark/blob/07ac8e95/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 1266d53..241cbd0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -38,15 +38,21 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { assert(d0 <= d1 && d1 <= d2 && d2 <= d3 && d3 - d0 <= 1) } - // This is a bad test. SPARK-9196 will fix it and re-enable it. - ignore("function current_timestamp") { + test("function current_timestamp and now") { val df1 = Seq((1, 2), (3, 1)).toDF("a", "b") checkAnswer(df1.select(countDistinct(current_timestamp())), Row(1)) + // Execution in one query should return the same value -checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), - Row(true)) -assert(math.abs(sql("""SELECT CURRENT_TIMESTAMP()""").collect().head.getTimestamp( - 0).getTime - System.currentTimeMillis()) < 5000) +checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = CURRENT_TIMESTAMP()"""), Row(true)) + +// Current timestamp should return the current timestamp ... +val before = System.currentTimeMillis +val got = sql("SELECT CURRENT_TIMESTAMP()").collect().head.getTimestamp(0).getTime +val after = System.currentTimeMillis +assert(got >= before && got <= after) + +// Now alias +checkAnswer(sql("""SELECT CURRENT_TIMESTAMP() = NOW()"""), Row(true)) } val sdf = new SimpleDateFormat("-MM-dd HH:mm:ss") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11553][SQL] Primitive Row accessors should not convert null to default value
Repository: spark Updated Branches: refs/heads/master bcea0bfda -> 31296628a [SPARK-11553][SQL] Primitive Row accessors should not convert null to default value Invocation of getters for type extending AnyVal returns default value (if field value is null) instead of throwing NPE. Please check comments for SPARK-11553 issue for more details. Author: Bartlomiej AlberskiCloses #9642 from alberskib/bugfix/SPARK-11553. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31296628 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31296628 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31296628 Branch: refs/heads/master Commit: 31296628ac7cd7be71e0edca335dc8604f62bb47 Parents: bcea0bf Author: Bartlomiej Alberski Authored: Mon Nov 16 15:14:38 2015 -0800 Committer: Michael Armbrust Committed: Mon Nov 16 15:14:38 2015 -0800 -- .../main/scala/org/apache/spark/sql/Row.scala | 32 - .../scala/org/apache/spark/sql/RowTest.scala| 20 +++ .../local/NestedLoopJoinNodeSuite.scala | 36 3 files changed, 65 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/31296628/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index 0f0f200..b14c66c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -191,7 +191,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getBoolean(i: Int): Boolean = getAs[Boolean](i) + def getBoolean(i: Int): Boolean = getAnyValAs[Boolean](i) /** * Returns the value at position i as a primitive byte. @@ -199,7 +199,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getByte(i: Int): Byte = getAs[Byte](i) + def getByte(i: Int): Byte = getAnyValAs[Byte](i) /** * Returns the value at position i as a primitive short. @@ -207,7 +207,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getShort(i: Int): Short = getAs[Short](i) + def getShort(i: Int): Short = getAnyValAs[Short](i) /** * Returns the value at position i as a primitive int. @@ -215,7 +215,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getInt(i: Int): Int = getAs[Int](i) + def getInt(i: Int): Int = getAnyValAs[Int](i) /** * Returns the value at position i as a primitive long. @@ -223,7 +223,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getLong(i: Int): Long = getAs[Long](i) + def getLong(i: Int): Long = getAnyValAs[Long](i) /** * Returns the value at position i as a primitive float. @@ -232,7 +232,7 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getFloat(i: Int): Float = getAs[Float](i) + def getFloat(i: Int): Float = getAnyValAs[Float](i) /** * Returns the value at position i as a primitive double. @@ -240,13 +240,12 @@ trait Row extends Serializable { * @throws ClassCastException when data type does not match. * @throws NullPointerException when value is null. */ - def getDouble(i: Int): Double = getAs[Double](i) + def getDouble(i: Int): Double = getAnyValAs[Double](i) /** * Returns the value at position i as a String object. * * @throws ClassCastException when data type does not match. - * @throws NullPointerException when value is null. */ def getString(i: Int): String = getAs[String](i) @@ -318,6 +317,8 @@ trait Row extends Serializable { /** * Returns the value at position i. + * For primitive types if value is null it returns 'zero value' specific for primitive + * ie. 0 for Int - use isNullAt to ensure that value is not null * * @throws ClassCastException when data type does not match. */ @@ -325,6 +326,8 @@ trait Row extends Serializable { /** * Returns the value of a given
spark git commit: [SPARK-11612][ML] Pipeline and PipelineModel persistence
Repository: spark Updated Branches: refs/heads/branch-1.6 32a69e4c1 -> 505eceef3 [SPARK-11612][ML] Pipeline and PipelineModel persistence Pipeline and PipelineModel extend Readable and Writable. Persistence succeeds only when all stages are Writable. Note: This PR reinstates tests for other read/write functionality. It should probably not get merged until [https://issues.apache.org/jira/browse/SPARK-11672] gets fixed. CC: mengxr Author: Joseph K. BradleyCloses #9674 from jkbradley/pipeline-io. (cherry picked from commit 1c5475f1401d2233f4c61f213d1e2c2ee9673067) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/505eceef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/505eceef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/505eceef Branch: refs/heads/branch-1.6 Commit: 505eceef303e3291253b35164fbec7e4390e8252 Parents: 32a69e4 Author: Joseph K. Bradley Authored: Mon Nov 16 17:12:39 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 16 17:12:48 2015 -0800 -- .../scala/org/apache/spark/ml/Pipeline.scala| 175 ++- .../org/apache/spark/ml/util/ReadWrite.scala| 4 +- .../org/apache/spark/ml/PipelineSuite.scala | 120 - .../spark/ml/util/DefaultReadWriteTest.scala| 25 +-- 4 files changed, 306 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/505eceef/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index a3e5940..25f0c69 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -22,12 +22,19 @@ import java.{util => ju} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import org.apache.spark.Logging +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.ml.param.{Param, ParamMap, Params} -import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.util.Reader +import org.apache.spark.ml.util.Writer +import org.apache.spark.ml.util._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -82,7 +89,7 @@ abstract class PipelineStage extends Params with Logging { * an identity transformer. */ @Experimental -class Pipeline(override val uid: String) extends Estimator[PipelineModel] { +class Pipeline(override val uid: String) extends Estimator[PipelineModel] with Writable { def this() = this(Identifiable.randomUID("pipeline")) @@ -166,6 +173,131 @@ class Pipeline(override val uid: String) extends Estimator[PipelineModel] { "Cannot have duplicate components in a pipeline.") theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur)) } + + override def write: Writer = new Pipeline.PipelineWriter(this) +} + +object Pipeline extends Readable[Pipeline] { + + override def read: Reader[Pipeline] = new PipelineReader + + override def load(path: String): Pipeline = read.load(path) + + private[ml] class PipelineWriter(instance: Pipeline) extends Writer { + +SharedReadWrite.validateStages(instance.getStages) + +override protected def saveImpl(path: String): Unit = + SharedReadWrite.saveImpl(instance, instance.getStages, sc, path) + } + + private[ml] class PipelineReader extends Reader[Pipeline] { + +/** Checked against metadata when loading model */ +private val className = "org.apache.spark.ml.Pipeline" + +override def load(path: String): Pipeline = { + val (uid: String, stages: Array[PipelineStage]) = SharedReadWrite.load(className, sc, path) + new Pipeline(uid).setStages(stages) +} + } + + /** Methods for [[Reader]] and [[Writer]] shared between [[Pipeline]] and [[PipelineModel]] */ + private[ml] object SharedReadWrite { + +import org.json4s.JsonDSL._ + +/** Check that all stages are Writable */ +def validateStages(stages: Array[PipelineStage]): Unit = { + stages.foreach { +case stage: Writable => // good +case other => + throw new UnsupportedOperationException("Pipeline write will fail on this Pipeline" + +s" because it contains a stage which does not implement Writable. Non-Writable stage:" + +s" ${other.uid} of
spark git commit: [SPARK-11612][ML] Pipeline and PipelineModel persistence
Repository: spark Updated Branches: refs/heads/master bd10eb81c -> 1c5475f14 [SPARK-11612][ML] Pipeline and PipelineModel persistence Pipeline and PipelineModel extend Readable and Writable. Persistence succeeds only when all stages are Writable. Note: This PR reinstates tests for other read/write functionality. It should probably not get merged until [https://issues.apache.org/jira/browse/SPARK-11672] gets fixed. CC: mengxr Author: Joseph K. BradleyCloses #9674 from jkbradley/pipeline-io. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c5475f1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c5475f1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c5475f1 Branch: refs/heads/master Commit: 1c5475f1401d2233f4c61f213d1e2c2ee9673067 Parents: bd10eb8 Author: Joseph K. Bradley Authored: Mon Nov 16 17:12:39 2015 -0800 Committer: Joseph K. Bradley Committed: Mon Nov 16 17:12:39 2015 -0800 -- .../scala/org/apache/spark/ml/Pipeline.scala| 175 ++- .../org/apache/spark/ml/util/ReadWrite.scala| 4 +- .../org/apache/spark/ml/PipelineSuite.scala | 120 - .../spark/ml/util/DefaultReadWriteTest.scala| 25 +-- 4 files changed, 306 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1c5475f1/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index a3e5940..25f0c69 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -22,12 +22,19 @@ import java.{util => ju} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -import org.apache.spark.Logging +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.ml.param.{Param, ParamMap, Params} -import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.util.Reader +import org.apache.spark.ml.util.Writer +import org.apache.spark.ml.util._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -82,7 +89,7 @@ abstract class PipelineStage extends Params with Logging { * an identity transformer. */ @Experimental -class Pipeline(override val uid: String) extends Estimator[PipelineModel] { +class Pipeline(override val uid: String) extends Estimator[PipelineModel] with Writable { def this() = this(Identifiable.randomUID("pipeline")) @@ -166,6 +173,131 @@ class Pipeline(override val uid: String) extends Estimator[PipelineModel] { "Cannot have duplicate components in a pipeline.") theStages.foldLeft(schema)((cur, stage) => stage.transformSchema(cur)) } + + override def write: Writer = new Pipeline.PipelineWriter(this) +} + +object Pipeline extends Readable[Pipeline] { + + override def read: Reader[Pipeline] = new PipelineReader + + override def load(path: String): Pipeline = read.load(path) + + private[ml] class PipelineWriter(instance: Pipeline) extends Writer { + +SharedReadWrite.validateStages(instance.getStages) + +override protected def saveImpl(path: String): Unit = + SharedReadWrite.saveImpl(instance, instance.getStages, sc, path) + } + + private[ml] class PipelineReader extends Reader[Pipeline] { + +/** Checked against metadata when loading model */ +private val className = "org.apache.spark.ml.Pipeline" + +override def load(path: String): Pipeline = { + val (uid: String, stages: Array[PipelineStage]) = SharedReadWrite.load(className, sc, path) + new Pipeline(uid).setStages(stages) +} + } + + /** Methods for [[Reader]] and [[Writer]] shared between [[Pipeline]] and [[PipelineModel]] */ + private[ml] object SharedReadWrite { + +import org.json4s.JsonDSL._ + +/** Check that all stages are Writable */ +def validateStages(stages: Array[PipelineStage]): Unit = { + stages.foreach { +case stage: Writable => // good +case other => + throw new UnsupportedOperationException("Pipeline write will fail on this Pipeline" + +s" because it contains a stage which does not implement Writable. Non-Writable stage:" + +s" ${other.uid} of type ${other.getClass}") + } +} + +/** + * Save metadata and stages for a [[Pipeline]] or [[PipelineModel]] + * -
spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.
Repository: spark Updated Branches: refs/heads/branch-1.6 505eceef3 -> e12ecfa36 [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder. The code was using the wrong API to add data to the internal composite buffer, causing buffers to leak in certain situations. Use the right API and enhance the tests to catch memory leaks. Also, avoid reusing the composite buffers when downstream handlers keep references to them; this seems to cause a few different issues even though the ref counting code seems to be correct, so instead pay the cost of copying a few bytes when that situation happens. Author: Marcelo VanzinCloses #9619 from vanzin/SPARK-11617. (cherry picked from commit 540bf58f18328c68107d6c616ffd70f3a4640054) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e12ecfa3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e12ecfa3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e12ecfa3 Branch: refs/heads/branch-1.6 Commit: e12ecfa360c54add488096493ce0dba15cbf3f8d Parents: 505ecee Author: Marcelo Vanzin Authored: Mon Nov 16 17:28:11 2015 -0800 Committer: Marcelo Vanzin Committed: Mon Nov 16 17:28:22 2015 -0800 -- .../network/util/TransportFrameDecoder.java | 47 -- .../util/TransportFrameDecoderSuite.java| 145 +++ 2 files changed, 151 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 272ea84..5889562 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -56,32 +56,43 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { buffer = in.alloc().compositeBuffer(); } -buffer.writeBytes(in); +buffer.addComponent(in).writerIndex(buffer.writerIndex() + in.readableBytes()); while (buffer.isReadable()) { - feedInterceptor(); - if (interceptor != null) { -continue; - } + discardReadBytes(); + if (!feedInterceptor()) { +ByteBuf frame = decodeNext(); +if (frame == null) { + break; +} - ByteBuf frame = decodeNext(); - if (frame != null) { ctx.fireChannelRead(frame); - } else { -break; } } -// We can't discard read sub-buffers if there are other references to the buffer (e.g. -// through slices used for framing). This assumes that code that retains references -// will call retain() from the thread that called "fireChannelRead()" above, otherwise -// ref counting will go awry. -if (buffer != null && buffer.refCnt() == 1) { +discardReadBytes(); + } + + private void discardReadBytes() { +// If the buffer's been retained by downstream code, then make a copy of the remaining +// bytes into a new buffer. Otherwise, just discard stale components. +if (buffer.refCnt() > 1) { + CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer(); + + if (buffer.readableBytes() > 0) { +ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes()); +spillBuf.writeBytes(buffer); +newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes()); + } + + buffer.release(); + buffer = newBuffer; +} else { buffer.discardReadComponents(); } } - protected ByteBuf decodeNext() throws Exception { + private ByteBuf decodeNext() throws Exception { if (buffer.readableBytes() < LENGTH_SIZE) { return null; } @@ -127,10 +138,14 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { this.interceptor = interceptor; } - private void feedInterceptor() throws Exception { + /** + * @return Whether the interceptor is still active after processing the data. + */ + private boolean feedInterceptor() throws Exception { if (interceptor != null && !interceptor.handle(buffer)) { interceptor = null; } +return interceptor != null; } public static interface Interceptor { http://git-wip-us.apache.org/repos/asf/spark/blob/e12ecfa3/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java -- diff
[1/2] spark git commit: Preparing Spark release v1.6.0-preview
Repository: spark Updated Branches: refs/heads/branch-1.6 e12ecfa36 -> 5a6f40459 Preparing Spark release v1.6.0-preview Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31db3610 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31db3610 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31db3610 Branch: refs/heads/branch-1.6 Commit: 31db361003715dc4f41176bd6cf572874f0b2676 Parents: e12ecfa Author: Patrick WendellAuthored: Mon Nov 16 18:43:43 2015 -0800 Committer: Patrick Wendell Committed: Mon Nov 16 18:43:43 2015 -0800 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- docker-integration-tests/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tags/pom.xml| 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 35 files changed, 35 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 4b60ee0..fbabaa5 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 672e946..1b3e417 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 37e3f16..d32b93b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index dee0c4a..ee9de91 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index f5ab2a7..37b15bb 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index dceedcf..295455a 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0-SNAPSHOT +1.6.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/31db3610/external/flume-sink/pom.xml
[2/2] spark git commit: Preparing development version 1.6.0-SNAPSHOT
Preparing development version 1.6.0-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5a6f4045 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5a6f4045 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5a6f4045 Branch: refs/heads/branch-1.6 Commit: 5a6f40459066bfa5c9221b2b64a9f8f13ee5013e Parents: 31db361 Author: Patrick WendellAuthored: Mon Nov 16 18:43:49 2015 -0800 Committer: Patrick Wendell Committed: Mon Nov 16 18:43:49 2015 -0800 -- assembly/pom.xml| 2 +- bagel/pom.xml | 2 +- core/pom.xml| 2 +- docker-integration-tests/pom.xml| 2 +- examples/pom.xml| 2 +- external/flume-assembly/pom.xml | 2 +- external/flume-sink/pom.xml | 2 +- external/flume/pom.xml | 2 +- external/kafka-assembly/pom.xml | 2 +- external/kafka/pom.xml | 2 +- external/mqtt-assembly/pom.xml | 2 +- external/mqtt/pom.xml | 2 +- external/twitter/pom.xml| 2 +- external/zeromq/pom.xml | 2 +- extras/java8-tests/pom.xml | 2 +- extras/kinesis-asl-assembly/pom.xml | 2 +- extras/kinesis-asl/pom.xml | 2 +- extras/spark-ganglia-lgpl/pom.xml | 2 +- graphx/pom.xml | 2 +- launcher/pom.xml| 2 +- mllib/pom.xml | 2 +- network/common/pom.xml | 2 +- network/shuffle/pom.xml | 2 +- network/yarn/pom.xml| 2 +- pom.xml | 2 +- repl/pom.xml| 2 +- sql/catalyst/pom.xml| 2 +- sql/core/pom.xml| 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml| 2 +- streaming/pom.xml | 2 +- tags/pom.xml| 2 +- tools/pom.xml | 2 +- unsafe/pom.xml | 2 +- yarn/pom.xml| 2 +- 35 files changed, 35 insertions(+), 35 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index fbabaa5..4b60ee0 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/bagel/pom.xml -- diff --git a/bagel/pom.xml b/bagel/pom.xml index 1b3e417..672e946 100644 --- a/bagel/pom.xml +++ b/bagel/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index d32b93b..37e3f16 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/docker-integration-tests/pom.xml -- diff --git a/docker-integration-tests/pom.xml b/docker-integration-tests/pom.xml index ee9de91..dee0c4a 100644 --- a/docker-integration-tests/pom.xml +++ b/docker-integration-tests/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/examples/pom.xml -- diff --git a/examples/pom.xml b/examples/pom.xml index 37b15bb..f5ab2a7 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/external/flume-assembly/pom.xml -- diff --git a/external/flume-assembly/pom.xml b/external/flume-assembly/pom.xml index 295455a..dceedcf 100644 --- a/external/flume-assembly/pom.xml +++ b/external/flume-assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.10 -1.6.0 +1.6.0-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/5a6f4045/external/flume-sink/pom.xml -- diff --git a/external/flume-sink/pom.xml
Git Push Summary
Repository: spark Updated Tags: refs/tags/v1.6.0-preview [created] 31db36100 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder.
Repository: spark Updated Branches: refs/heads/master 1c5475f14 -> 540bf58f1 [SPARK-11617][NETWORK] Fix leak in TransportFrameDecoder. The code was using the wrong API to add data to the internal composite buffer, causing buffers to leak in certain situations. Use the right API and enhance the tests to catch memory leaks. Also, avoid reusing the composite buffers when downstream handlers keep references to them; this seems to cause a few different issues even though the ref counting code seems to be correct, so instead pay the cost of copying a few bytes when that situation happens. Author: Marcelo VanzinCloses #9619 from vanzin/SPARK-11617. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/540bf58f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/540bf58f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/540bf58f Branch: refs/heads/master Commit: 540bf58f18328c68107d6c616ffd70f3a4640054 Parents: 1c5475f Author: Marcelo Vanzin Authored: Mon Nov 16 17:28:11 2015 -0800 Committer: Marcelo Vanzin Committed: Mon Nov 16 17:28:11 2015 -0800 -- .../network/util/TransportFrameDecoder.java | 47 -- .../util/TransportFrameDecoderSuite.java| 145 +++ 2 files changed, 151 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java index 272ea84..5889562 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportFrameDecoder.java @@ -56,32 +56,43 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { buffer = in.alloc().compositeBuffer(); } -buffer.writeBytes(in); +buffer.addComponent(in).writerIndex(buffer.writerIndex() + in.readableBytes()); while (buffer.isReadable()) { - feedInterceptor(); - if (interceptor != null) { -continue; - } + discardReadBytes(); + if (!feedInterceptor()) { +ByteBuf frame = decodeNext(); +if (frame == null) { + break; +} - ByteBuf frame = decodeNext(); - if (frame != null) { ctx.fireChannelRead(frame); - } else { -break; } } -// We can't discard read sub-buffers if there are other references to the buffer (e.g. -// through slices used for framing). This assumes that code that retains references -// will call retain() from the thread that called "fireChannelRead()" above, otherwise -// ref counting will go awry. -if (buffer != null && buffer.refCnt() == 1) { +discardReadBytes(); + } + + private void discardReadBytes() { +// If the buffer's been retained by downstream code, then make a copy of the remaining +// bytes into a new buffer. Otherwise, just discard stale components. +if (buffer.refCnt() > 1) { + CompositeByteBuf newBuffer = buffer.alloc().compositeBuffer(); + + if (buffer.readableBytes() > 0) { +ByteBuf spillBuf = buffer.alloc().buffer(buffer.readableBytes()); +spillBuf.writeBytes(buffer); +newBuffer.addComponent(spillBuf).writerIndex(spillBuf.readableBytes()); + } + + buffer.release(); + buffer = newBuffer; +} else { buffer.discardReadComponents(); } } - protected ByteBuf decodeNext() throws Exception { + private ByteBuf decodeNext() throws Exception { if (buffer.readableBytes() < LENGTH_SIZE) { return null; } @@ -127,10 +138,14 @@ public class TransportFrameDecoder extends ChannelInboundHandlerAdapter { this.interceptor = interceptor; } - private void feedInterceptor() throws Exception { + /** + * @return Whether the interceptor is still active after processing the data. + */ + private boolean feedInterceptor() throws Exception { if (interceptor != null && !interceptor.handle(buffer)) { interceptor = null; } +return interceptor != null; } public static interface Interceptor { http://git-wip-us.apache.org/repos/asf/spark/blob/540bf58f/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java -- diff --git a/network/common/src/test/java/org/apache/spark/network/util/TransportFrameDecoderSuite.java
spark git commit: [SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for metadata and add a test for FIXED_LEN_BYTE_ARRAY
Repository: spark Updated Branches: refs/heads/master fbad920db -> 75d202073 [SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for metadata and add a test for FIXED_LEN_BYTE_ARRAY As discussed https://github.com/apache/spark/pull/9660 https://github.com/apache/spark/pull/9060, I cleaned up unused imports, added a test for fixed-length byte array and used a common function for writing metadata for Parquet. For the test for fixed-length byte array, I have tested and checked the encoding types with [parquet-tools](https://github.com/Parquet/parquet-mr/tree/master/parquet-tools). Author: hyukjinkwonCloses #9754 from HyukjinKwon/SPARK-11694-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75d20207 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75d20207 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75d20207 Branch: refs/heads/master Commit: 75d202073143d5a7f943890d8682b5b0cf9e3092 Parents: fbad920 Author: hyukjinkwon Authored: Tue Nov 17 14:35:00 2015 +0800 Committer: Cheng Lian Committed: Tue Nov 17 14:35:00 2015 +0800 -- .../src/test/resources/dec-in-fixed-len.parquet | Bin 0 -> 460 bytes .../datasources/parquet/ParquetIOSuite.scala| 42 +++ 2 files changed, 15 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/75d20207/sql/core/src/test/resources/dec-in-fixed-len.parquet -- diff --git a/sql/core/src/test/resources/dec-in-fixed-len.parquet b/sql/core/src/test/resources/dec-in-fixed-len.parquet new file mode 100644 index 000..6ad37d5 Binary files /dev/null and b/sql/core/src/test/resources/dec-in-fixed-len.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/75d20207/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index a148fac..177ab42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import java.util.Collections - import org.apache.parquet.column.{Encoding, ParquetProperties} import scala.collection.JavaConverters._ @@ -33,7 +31,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -243,15 +241,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val extraMetadata = Map.empty[String, String].asJava - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) - val footer = List( -new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) - ).asJava - - ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) - + val conf = sparkContext.hadoopConfiguration + writeMetadata(parquetSchema, path, conf) val errorMessage = intercept[Throwable] { sqlContext.read.parquet(path.toString).printSchema() }.toString @@ -267,20 +259,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { |} """.stripMargin) +val expectedSparkTypes = Seq(StringType, BinaryType) + withTempPath { location => - val extraMetadata = Map.empty[String, String].asJava - val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) - val footer = List( -new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())) - ).asJava - - ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, path, footer) - - val jsonDataType = sqlContext.read.parquet(path.toString).schema(0).dataType - assert(jsonDataType ===
spark git commit: [SPARK-11745][SQL] Enable more JSON parsing options
Repository: spark Updated Branches: refs/heads/master fd50fa4c3 -> 42de5253f [SPARK-11745][SQL] Enable more JSON parsing options This patch adds the following options to the JSON data source, for dealing with non-standard JSON files: * `allowComments` (default `false`): ignores Java/C++ style comment in JSON records * `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names * `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes * `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers (e.g. 00012) To avoid passing a lot of options throughout the json package, I introduced a new JSONOptions case class to define all JSON config options. Also updated documentation to explain these options. Scala ![screen shot 2015-11-15 at 6 12 12 pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png) Python ![screen shot 2015-11-15 at 6 11 28 pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png) Author: Reynold XinCloses #9724 from rxin/SPARK-11745. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42de5253 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42de5253 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42de5253 Branch: refs/heads/master Commit: 42de5253f327bd7ee258b0efb5024f3847fa3b51 Parents: fd50fa4 Author: Reynold Xin Authored: Mon Nov 16 00:06:14 2015 -0800 Committer: Reynold Xin Committed: Mon Nov 16 00:06:14 2015 -0800 -- python/pyspark/sql/readwriter.py| 10 ++ .../org/apache/spark/sql/DataFrameReader.scala | 22 ++-- .../apache/spark/sql/execution/SparkPlan.scala | 17 +-- .../datasources/json/InferSchema.scala | 34 +++--- .../datasources/json/JSONOptions.scala | 64 +++ .../datasources/json/JSONRelation.scala | 20 ++-- .../datasources/json/JacksonParser.scala| 82 ++--- .../json/JsonParsingOptionsSuite.scala | 114 +++ .../execution/datasources/json/JsonSuite.scala | 29 +++-- 9 files changed, 286 insertions(+), 106 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42de5253/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 927f407..7b8ddb9 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -153,6 +153,16 @@ class DataFrameReader(object): or RDD of Strings storing JSON objects. :param schema: an optional :class:`StructType` for the input schema. +You can set the following JSON-specific options to deal with non-standard JSON files: +* ``primitivesAsString`` (default ``false``): infers all primitive values as a string \ +type +* ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records +* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names +* ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \ +quotes +* ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \ +(e.g. 00012) + >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes [('age', 'bigint'), ('name', 'string')] http://git-wip-us.apache.org/repos/asf/spark/blob/42de5253/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 6a194a4..5872fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} -import org.apache.spark.sql.execution.datasources.json.JSONRelation +import org.apache.spark.sql.execution.datasources.json.{JSONOptions, JSONRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types.StructType @@ -227,6 +227,15 @@
spark git commit: [SPARK-11745][SQL] Enable more JSON parsing options
Repository: spark Updated Branches: refs/heads/branch-1.6 053c63ecf -> a0f9cd77a [SPARK-11745][SQL] Enable more JSON parsing options This patch adds the following options to the JSON data source, for dealing with non-standard JSON files: * `allowComments` (default `false`): ignores Java/C++ style comment in JSON records * `allowUnquotedFieldNames` (default `false`): allows unquoted JSON field names * `allowSingleQuotes` (default `true`): allows single quotes in addition to double quotes * `allowNumericLeadingZeros` (default `false`): allows leading zeros in numbers (e.g. 00012) To avoid passing a lot of options throughout the json package, I introduced a new JSONOptions case class to define all JSON config options. Also updated documentation to explain these options. Scala ![screen shot 2015-11-15 at 6 12 12 pm](https://cloud.githubusercontent.com/assets/323388/11172965/e3ace6ec-8bc4-11e5-805e-2d78f80d0ed6.png) Python ![screen shot 2015-11-15 at 6 11 28 pm](https://cloud.githubusercontent.com/assets/323388/11172964/e23ed6ee-8bc4-11e5-8216-312f5983acd5.png) Author: Reynold XinCloses #9724 from rxin/SPARK-11745. (cherry picked from commit 42de5253f327bd7ee258b0efb5024f3847fa3b51) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a0f9cd77 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a0f9cd77 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a0f9cd77 Branch: refs/heads/branch-1.6 Commit: a0f9cd77a73ae911d382ac8ddd52614800663704 Parents: 053c63e Author: Reynold Xin Authored: Mon Nov 16 00:06:14 2015 -0800 Committer: Reynold Xin Committed: Mon Nov 16 00:06:21 2015 -0800 -- python/pyspark/sql/readwriter.py| 10 ++ .../org/apache/spark/sql/DataFrameReader.scala | 22 ++-- .../apache/spark/sql/execution/SparkPlan.scala | 17 +-- .../datasources/json/InferSchema.scala | 34 +++--- .../datasources/json/JSONOptions.scala | 64 +++ .../datasources/json/JSONRelation.scala | 20 ++-- .../datasources/json/JacksonParser.scala| 82 ++--- .../json/JsonParsingOptionsSuite.scala | 114 +++ .../execution/datasources/json/JsonSuite.scala | 29 +++-- 9 files changed, 286 insertions(+), 106 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 927f407..7b8ddb9 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -153,6 +153,16 @@ class DataFrameReader(object): or RDD of Strings storing JSON objects. :param schema: an optional :class:`StructType` for the input schema. +You can set the following JSON-specific options to deal with non-standard JSON files: +* ``primitivesAsString`` (default ``false``): infers all primitive values as a string \ +type +* ``allowComments`` (default ``false``): ignores Java/C++ style comment in JSON records +* ``allowUnquotedFieldNames`` (default ``false``): allows unquoted JSON field names +* ``allowSingleQuotes`` (default ``true``): allows single quotes in addition to double \ +quotes +* ``allowNumericLeadingZeros`` (default ``false``): allows leading zeros in numbers \ +(e.g. 00012) + >>> df1 = sqlContext.read.json('python/test_support/sql/people.json') >>> df1.dtypes [('age', 'bigint'), ('name', 'string')] http://git-wip-us.apache.org/repos/asf/spark/blob/a0f9cd77/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 6a194a4..5872fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -29,7 +29,7 @@ import org.apache.spark.api.java.JavaRDD import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} -import org.apache.spark.sql.execution.datasources.json.JSONRelation +import org.apache.spark.sql.execution.datasources.json.{JSONOptions, JSONRelation} import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import
spark git commit: [SPARK-11447][SQL] change NullType to StringType during binaryComparison between NullType and StringType
Repository: spark Updated Branches: refs/heads/branch-1.6 07ac8e950 -> 113410c12 [SPARK-11447][SQL] change NullType to StringType during binaryComparison between NullType and StringType During executing PromoteStrings rule, if one side of binaryComparison is StringType and the other side is not StringType, the current code will promote(cast) the StringType to DoubleType, and if the StringType doesn't contain the numbers, it will get null value. So if it is doing <=> (NULL-safe equal) with Null, it will not filter anything, caused the problem reported by this jira. I proposal to the changes through this PR, can you review my code changes ? This problem only happen for <=>, other operators works fine. scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null filteredDF: org.apache.spark.sql.DataFrame = [column: string] scala> filteredDF.show +--+ |column| +--+ +--+ scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null filteredDF: org.apache.spark.sql.DataFrame = [column: string] scala> filteredDF.show +--+ |column| +--+ +--+ scala> df.registerTempTable("DF") scala> sqlContext.sql("select * from DF where 'column' = NULL") res27: org.apache.spark.sql.DataFrame = [column: string] scala> res27.show +--+ |column| +--+ +--+ Author: Kevin YuCloses #9720 from kevinyu98/working_on_spark-11447. (cherry picked from commit e01865af0d5ebe11033de46c388c5c583876c187) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/113410c1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/113410c1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/113410c1 Branch: refs/heads/branch-1.6 Commit: 113410c12529caf5d1f528efeba4d22489ed78ec Parents: 07ac8e9 Author: Kevin Yu Authored: Mon Nov 16 22:54:29 2015 -0800 Committer: Yin Huai Committed: Mon Nov 16 22:55:13 2015 -0800 -- .../spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 6 ++ .../org/apache/spark/sql/ColumnExpressionSuite.scala | 11 +++ 2 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/113410c1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 92188ee..f90fc3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -281,6 +281,12 @@ object HiveTypeCoercion { case p @ BinaryComparison(left @ DateType(), right @ TimestampType()) => p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType))) + // Checking NullType + case p @ BinaryComparison(left @ StringType(), right @ NullType()) => +p.makeCopy(Array(left, Literal.create(null, StringType))) + case p @ BinaryComparison(left @ NullType(), right @ StringType()) => +p.makeCopy(Array(Literal.create(null, StringType), right)) + case p @ BinaryComparison(left @ StringType(), right) if right.dataType != StringType => p.makeCopy(Array(Cast(left, DoubleType), right)) case p @ BinaryComparison(left, right @ StringType()) if left.dataType != StringType => http://git-wip-us.apache.org/repos/asf/spark/blob/113410c1/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 3eae3f6..38c0eb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -368,6 +368,17 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { checkAnswer( nullData.filter($"a" <=> $"b"), Row(1, 1) :: Row(null, null) :: Nil) + +val nullData2 = sqlContext.createDataFrame(sparkContext.parallelize( +Row("abc") :: +Row(null) :: +Row("xyz") :: Nil), +StructType(Seq(StructField("a", StringType, true + +checkAnswer( + nullData2.filter($"a" <=> null), + Row(null) :: Nil) + } test(">") { - To unsubscribe, e-mail:
spark git commit: [MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite
Repository: spark Updated Branches: refs/heads/master e01865af0 -> d79d8b08f [MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite The randomly generated ArrayData used for the UDT `ExamplePoint` in `RowEncoderSuite` sometimes doesn't have enough elements. In this case, this test will fail. This patch is to fix it. Author: Liang-Chi HsiehCloses #9757 from viirya/fix-randomgenerated-udt. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d79d8b08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d79d8b08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d79d8b08 Branch: refs/heads/master Commit: d79d8b08ff69b30b02fe87839e695e29bfea5ace Parents: e01865a Author: Liang-Chi Hsieh Authored: Mon Nov 16 23:16:17 2015 -0800 Committer: Davies Liu Committed: Mon Nov 16 23:16:17 2015 -0800 -- .../spark/sql/catalyst/encoders/RowEncoderSuite.scala | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d79d8b08/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index c868dde..46c6e0d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.encoders +import scala.util.Random + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayData} @@ -59,7 +61,12 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] { override def deserialize(datum: Any): ExamplePoint = { datum match { case values: ArrayData => -new ExamplePoint(values.getDouble(0), values.getDouble(1)) +if (values.numElements() > 1) { + new ExamplePoint(values.getDouble(0), values.getDouble(1)) +} else { + val random = new Random() + new ExamplePoint(random.nextDouble(), random.nextDouble()) +} } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite
Repository: spark Updated Branches: refs/heads/branch-1.6 113410c12 -> 2ae1fa074 [MINOR] [SQL] Fix randomly generated ArrayData in RowEncoderSuite The randomly generated ArrayData used for the UDT `ExamplePoint` in `RowEncoderSuite` sometimes doesn't have enough elements. In this case, this test will fail. This patch is to fix it. Author: Liang-Chi HsiehCloses #9757 from viirya/fix-randomgenerated-udt. (cherry picked from commit d79d8b08ff69b30b02fe87839e695e29bfea5ace) Signed-off-by: Davies Liu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ae1fa07 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ae1fa07 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ae1fa07 Branch: refs/heads/branch-1.6 Commit: 2ae1fa074c2dedc67bdde0ac48e2af9476d4cba1 Parents: 113410c Author: Liang-Chi Hsieh Authored: Mon Nov 16 23:16:17 2015 -0800 Committer: Davies Liu Committed: Mon Nov 16 23:16:33 2015 -0800 -- .../spark/sql/catalyst/encoders/RowEncoderSuite.scala | 9 - 1 file changed, 8 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ae1fa07/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index c868dde..46c6e0d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.encoders +import scala.util.Random + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.util.{GenericArrayData, ArrayData} @@ -59,7 +61,12 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] { override def deserialize(datum: Any): ExamplePoint = { datum match { case values: ArrayData => -new ExamplePoint(values.getDouble(0), values.getDouble(1)) +if (values.numElements() > 1) { + new ExamplePoint(values.getDouble(0), values.getDouble(1)) +} else { + val random = new Random() + new ExamplePoint(random.nextDouble(), random.nextDouble()) +} } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-11447][SQL] change NullType to StringType during binaryComparison between NullType and StringType
Repository: spark Updated Branches: refs/heads/master 75d202073 -> e01865af0 [SPARK-11447][SQL] change NullType to StringType during binaryComparison between NullType and StringType During executing PromoteStrings rule, if one side of binaryComparison is StringType and the other side is not StringType, the current code will promote(cast) the StringType to DoubleType, and if the StringType doesn't contain the numbers, it will get null value. So if it is doing <=> (NULL-safe equal) with Null, it will not filter anything, caused the problem reported by this jira. I proposal to the changes through this PR, can you review my code changes ? This problem only happen for <=>, other operators works fine. scala> val filteredDF = df.filter(df("column") > (new Column(Literal(null filteredDF: org.apache.spark.sql.DataFrame = [column: string] scala> filteredDF.show +--+ |column| +--+ +--+ scala> val filteredDF = df.filter(df("column") === (new Column(Literal(null filteredDF: org.apache.spark.sql.DataFrame = [column: string] scala> filteredDF.show +--+ |column| +--+ +--+ scala> df.registerTempTable("DF") scala> sqlContext.sql("select * from DF where 'column' = NULL") res27: org.apache.spark.sql.DataFrame = [column: string] scala> res27.show +--+ |column| +--+ +--+ Author: Kevin YuCloses #9720 from kevinyu98/working_on_spark-11447. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e01865af Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e01865af Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e01865af Branch: refs/heads/master Commit: e01865af0d5ebe11033de46c388c5c583876c187 Parents: 75d2020 Author: Kevin Yu Authored: Mon Nov 16 22:54:29 2015 -0800 Committer: Yin Huai Committed: Mon Nov 16 22:54:29 2015 -0800 -- .../spark/sql/catalyst/analysis/HiveTypeCoercion.scala | 6 ++ .../org/apache/spark/sql/ColumnExpressionSuite.scala | 11 +++ 2 files changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e01865af/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 92188ee..f90fc3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -281,6 +281,12 @@ object HiveTypeCoercion { case p @ BinaryComparison(left @ DateType(), right @ TimestampType()) => p.makeCopy(Array(Cast(left, StringType), Cast(right, StringType))) + // Checking NullType + case p @ BinaryComparison(left @ StringType(), right @ NullType()) => +p.makeCopy(Array(left, Literal.create(null, StringType))) + case p @ BinaryComparison(left @ NullType(), right @ StringType()) => +p.makeCopy(Array(Literal.create(null, StringType), right)) + case p @ BinaryComparison(left @ StringType(), right) if right.dataType != StringType => p.makeCopy(Array(Cast(left, DoubleType), right)) case p @ BinaryComparison(left, right @ StringType()) if left.dataType != StringType => http://git-wip-us.apache.org/repos/asf/spark/blob/e01865af/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index 3eae3f6..38c0eb5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -368,6 +368,17 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { checkAnswer( nullData.filter($"a" <=> $"b"), Row(1, 1) :: Row(null, null) :: Nil) + +val nullData2 = sqlContext.createDataFrame(sparkContext.parallelize( +Row("abc") :: +Row(null) :: +Row("xyz") :: Nil), +StructType(Seq(StructField("a", StringType, true + +checkAnswer( + nullData2.filter($"a" <=> null), + Row(null) :: Nil) + } test(">") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org