[GitHub] spark issue #16804: [SPARK-19459][SQL] Add Hive datatype (char/varchar) to S...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16804 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16873: [SPARK-19509][SQL] Grouping Sets do not respect nullable...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16873 cc @cloud-fan @jiangxb1987 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16873: [SPARK-19509][SQL] Grouping Sets do not respect n...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/16873 [SPARK-19509][SQL] Grouping Sets do not respect nullable grouping columns ## What changes were proposed in this pull request? The analyzer currently does not check if a column used in grouping sets is actually nullable itself. This can cause the nullability of the column to be incorrect, which can cause null pointer exceptions down the line. This PR fixes that by also consider the nullability of the column. This is only a problem for Spark 2.1 and below. The latest master uses a different approach. ## How was this patch tested? Added a regression test to `SQLQueryTestSuite.grouping_set`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-19509 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16873 commit b54ef732d9b6a680bc73f3a27e7fed68df75c68c Author: Herman van Hovell Date: 2017-02-09T12:45:42Z Grouping set should respect nullability of input. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16872: [SPARK-19514] Making range interruptible.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16872#discussion_r100308080 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -127,4 +133,28 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext { } } } + + test("Cancelling stage in a query with Range.") { +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +Thread.sleep(100) --- End diff -- Can we do something a little less coarse (it probably also blocks other events from being emitted), and with less potential for flakyness? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16872: [SPARK-19514] Making range interruptible.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16872#discussion_r100309681 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -443,6 +443,10 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | if (shouldStop()) return; | } | + | if (TaskContext.get().isInterrupted()) { --- End diff -- You could also move TaskContext.get() out of the loop. It should not change during evaluation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16872: [SPARK-19514] Making range interruptible.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16872#discussion_r100310307 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -127,4 +133,28 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext { } } } + + test("Cancelling stage in a query with Range.") { +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +Thread.sleep(100) +sparkContext.cancelStage(taskStart.stageId) + } +} + +sparkContext.addSparkListener(listener) +for (codegen <- Seq(true, false)) { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { +val ex = intercept[SparkException] { + spark.range(10L).crossJoin(spark.range(10L)) +.toDF("a", "b").agg(sum("a"), sum("b")).collect() +} +val msg = if (ex.getCause() != null) ex.getCause().getMessage() else ex.getMessage() --- End diff -- Which exception do we actually expect here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16873: [SPARK-19509][SQL] Grouping Sets do not respect nullable...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16873 also cc @stanzhai --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16869: [SPARK-19025][SQL] Remove SQL builder for operators
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16869 cc @rxin @yhuai wdyt? Should we be bold and remove this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16869: [SPARK-19025][SQL] Remove SQL builder for operators
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16869 LGTM. I'd like to wait a little with merging this in order to build some consensus. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16872: [SPARK-19514] Making range interruptible.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16872#discussion_r100350259 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -127,4 +133,28 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext { } } } + + test("Cancelling stage in a query with Range.") { +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +Thread.sleep(100) --- End diff -- cc @zsxwing do you have a suggestion to do this nicely? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16873: [SPARK-19509][SQL] Grouping Sets do not respect n...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16873#discussion_r100357789 --- Diff: sql/core/src/test/resources/sql-tests/inputs/grouping_set.sql --- @@ -13,5 +18,8 @@ SELECT a, b, c, count(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((a)); -- SPARK-17849: grouping set throws NPE #3 SELECT a, b, c, count(d) FROM grouping GROUP BY a, b, c GROUPING SETS ((c)); +-- SPARK-19509: grouping set should honor input nullability +SELECT COUNT(1) FROM grouping_null GROUP BY e GROUPING SETS (e); - +DROP VIEW IF EXISTS grouping; +DROP VIEW IF EXISTS grouping_null; --- End diff -- Yeah lemme fix that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16610: [SPARK-19254][SQL] Support Seq, Map, and Struct i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16610#discussion_r100360526 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -102,6 +102,27 @@ object functions { Column(literalExpr) } + /** + * Creates a [[Column]] of literal value. + * + * The passed in object is returned directly if it is already a [[Column]]. + * If the object is a Scala Symbol, it is converted into a [[Column]] also. + * Otherwise, a new [[Column]] is created to represent the literal value. + * + * @group normal_funcs + * @since 2.2.0 + */ + def lit2[T : TypeTag](literal: T): Column = { --- End diff -- Do you think there is a way we can actually avoid this? If we must why not name it `typedLit`? cc @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16610: [SPARK-19254][SQL] Support Seq, Map, and Struct i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16610#discussion_r100360787 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -153,6 +154,12 @@ object Literal { Literal(CatalystTypeConverters.convertToCatalyst(v), dataType) } + def create[T : TypeTag](v: T): Literal = { +val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] +val convert = CatalystTypeConverters.createToCatalystConverter(dataType) --- End diff -- Call `create(v, dataType)` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16852: [SPARK-19512][SQL] codegen for compare structs fails
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16852 LGTM - merging to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16852: [SPARK-19512][SQL] codegen for compare structs fails
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16852 @bogdanrdc can you create a backport for branch-2.1? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16869: [SPARK-19025][SQL] Remove SQL builder for operators
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16869 I had an offline discussion with @rxin about this, and we have decided to merge this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16872: [SPARK-19514] Making range interruptible.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16872#discussion_r100389578 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -127,4 +133,28 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext { } } } + + test("Cancelling stage in a query with Range.") { +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +Thread.sleep(100) --- End diff -- The problem is that we are trying to kill a query once it actually started executing. If we do not call `sleep(...)` the query gets killed before it starts executing. The other option is to wait for `onExecutorMetricsUpdate(..)` but this is tied to the heartbeat (10s), and makes the test much slower. Should we move to the other approach, or do you have any other suggestions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16610: [SPARK-19254][SQL] Support Seq, Map, and Struct i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16610#discussion_r100397651 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -153,6 +154,12 @@ object Literal { Literal(CatalystTypeConverters.convertToCatalyst(v), dataType) } + def create[T : TypeTag](v: T): Literal = { +val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] +val convert = CatalystTypeConverters.createToCatalystConverter(dataType) --- End diff -- Ah ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16610: [SPARK-19254][SQL] Support Seq, Map, and Struct i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16610#discussion_r100397761 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala --- @@ -153,6 +154,12 @@ object Literal { Literal(CatalystTypeConverters.convertToCatalyst(v), dataType) } + def create[T : TypeTag](v: T): Literal = { +val ScalaReflection.Schema(dataType, _) = ScalaReflection.schemaFor[T] +val convert = CatalystTypeConverters.createToCatalystConverter(dataType) --- End diff -- Shouldn't you be going to talks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16873: [SPARK-19509][SQL] Grouping Sets do not respect nullable...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16873 I am merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16873: [SPARK-19509][SQL] Grouping Sets do not respect n...
Github user hvanhovell closed the pull request at: https://github.com/apache/spark/pull/16873 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16874: [SPARK-19509][SQL]Fix a NPE problem in grouping sets whe...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16874 @stanzhai I have merged my PR, and assigned the PR to your name. Could you close this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16875: [BACKPORT-2.1][SPARK-19512][SQL] codegen for compare str...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16875 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16886: [SPARK-19548][SQL] Support Hive UDFs which return...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/16886 [SPARK-19548][SQL] Support Hive UDFs which return typed Lists/Maps ## What changes were proposed in this pull request? This PR adds support for Hive UDFs that return fully typed java Lists or Maps, for example `List` or `Map`. It is also allowed to nest these structures, for example `Map>`. Raw collections or collections using wildcards are still not supported, and cannot be supported due to the lack of type information. ## How was this patch tested? Modified existing tests in `HiveUDFSuite`, and I have added test cases for raw collection and collection using wildcards. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-19548 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16886.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16886 commit 56cdabdd0d19d6a07229202e939c8bd5beef4b12 Author: Herman van Hovell Date: 2017-02-10T11:21:02Z Support Hive UDFs which return typed Lists/Maps --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16886: [SPARK-19548][SQL] Support Hive UDFs which return typed ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16886 cc @cloud-fan @yhuai @maropu --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16881: [SPARK-19543] from_json fails when the input row ...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16881#discussion_r100520672 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -496,7 +496,7 @@ case class JsonToStruct(schema: StructType, options: Map[String, String], child: override def dataType: DataType = schema override def nullSafeEval(json: Any): Any = { -try parser.parse(json.toString).head catch { +try parser.parse(json.toString).headOption.orNull catch { --- End diff -- @HyukjinKwon that seems fair. Feel free to work in this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16881: [SPARK-19543] from_json fails when the input row is empt...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16881 LGTM - merging to master/2.1. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16882: [SPARK-19544][SQL] Improve error message when some colum...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16882 @HyukjinKwon Can you also improve the error message. I don't think `StructType(StructField(_1,StringType,true), StructField(_2,StringType,true)) <> StructType(StructField(_1,StringType,true), StructField(_2,IntegerType,false))` is readable at all. Can we use catalog string instead? Other than that, this looks fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16870: [SPARK-19496][SQL]to_date udf to return null when...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16870#discussion_r100524361 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala --- @@ -500,6 +516,20 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { Row(date1.getTime / 1000L), Row(date2.getTime / 1000L))) checkAnswer(df.selectExpr(s"to_unix_timestamp(s, '$fmt')"), Seq( Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + + +val x1 = "2015-07-24 10:00:00" +val x2 = "2015-25-07 02:02:02" +val x3 = "2015-07-24 25:02:02" +val x4 = "2015-24-07 26:02:02" +val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") +val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") + +val df1 = Seq(x1, x2, x3, x4).toDF("x") +checkAnswer(df1.selectExpr("to_unix_timestamp(x)"), Seq( + Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null))) +checkAnswer(df1.selectExpr(s"to_unix_timestamp(x, '-MM-dd mm:HH:ss')"), Seq( + Row(ts3.getTime / 1000L), Row(ts4.getTime / 1000L), Row(null), Row(null))) --- End diff -- Shouldn't the order be `Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L), Row(null)`? It does not matter for testing since we sort results, but it makes it less confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16870: [SPARK-19496][SQL]to_date udf to return null when...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16870#discussion_r100523512 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala --- @@ -477,6 +475,24 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq( Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) + +val x1 = "2015-07-24 10:00:00" +val x2 = "2015-25-07 02:02:02" +val x3 = "2015-07-24 25:02:02" +val x4 = "2015-24-07 26:02:02" +val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") +val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") + +val df1 = Seq(x1, x2, x3, x4).toDF("x") +checkAnswer(df1.select(unix_timestamp(col("x"))), Seq( + Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null))) +checkAnswer(df1.selectExpr("unix_timestamp(x)"), Seq( + Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null))) +checkAnswer(df1.select(unix_timestamp(col("x"), "-dd-MM HH:mm:ss")), Seq( + Row(ts2.getTime / 1000L), Row(null), Row(null), Row(null))) +checkAnswer(df1.selectExpr(s"unix_timestamp(x, '-MM-dd mm:HH:ss')"), Seq( + Row(ts3.getTime / 1000L), Row(ts4.getTime / 1000L), Row(null), Row(null))) --- End diff -- Shouldn't the order be `Row(ts4.getTime / 1000L), Row(null), Row(ts3.getTime / 1000L), Row(null)`? It does not matter for testing since we sort results, but it makes it less confusing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16870: [SPARK-19496][SQL]to_date udf to return null when...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16870#discussion_r100523109 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -95,9 +95,12 @@ object DateTimeUtils { sdf } - def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = { + def newDateFormat(formatString: String, + timeZone: TimeZone, + isLenient: Boolean = true): DateFormat = { --- End diff -- Let's not make this a default parameter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16785 @viirya this looks like a very big hammer to solve this problem. Can't we try a different approach? I think we should try to avoid optimizing already optimized code snippets, you might be able to do this using some kind of a fence. It would even be better if we would have a recursive node. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16818: [SPARK-19451][SQL][Core] Underlying integer overf...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16818#discussion_r100539109 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/BoundOrdering.scala --- @@ -25,18 +25,22 @@ import org.apache.spark.sql.catalyst.expressions.Projection * Function for comparing boundary values. */ private[window] abstract class BoundOrdering { - def compare(inputRow: InternalRow, inputIndex: Int, outputRow: InternalRow, outputIndex: Int): Int + def compare( + inputRow: InternalRow, + inputIndex: Long, + outputRow: InternalRow, + outputIndex: Long): Long } /** * Compare the input index to the bound of the output index. */ -private[window] final case class RowBoundOrdering(offset: Int) extends BoundOrdering { +private[window] final case class RowBoundOrdering(offset: Long) extends BoundOrdering { --- End diff -- It does not make any sense to make the offsets longs. This is an execution detail, buffer indexes (which are integer bound), and you really should not be messing with those. Try to keep your change more local, and only modify `WindowExec.createBoundOrdering` and the code generating the `WindowExec.windowFrameExpressionFactoryPairs`. That should be enough. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16870: [SPARK-19496][SQL]to_date udf to return null when...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16870#discussion_r100539828 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala --- @@ -465,15 +465,15 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti copy(timeZoneId = Option(timeZoneId)) override protected def nullSafeEval(timestamp: Any, format: Any): Any = { -val df = DateTimeUtils.newDateFormat(format.toString, timeZone) +val df = DateTimeUtils.newDateFormat(format.toString, timeZone, isLenient = true) UTF8String.fromString(df.format(new java.util.Date(timestamp.asInstanceOf[Long] / 1000))) } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") val tz = ctx.addReferenceMinorObj(timeZone) defineCodeGen(ctx, ev, (timestamp, format) => { - s"""UTF8String.fromString($dtu.newDateFormat($format.toString(), $tz) + s"""UTF8String.fromString($dtu.newDateFormat($format.toString(), $tz, false) --- End diff -- Why is this one not lenient? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16886: [SPARK-19548][SQL] Support Hive UDFs which return...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16886#discussion_r100544404 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala --- @@ -218,22 +220,33 @@ private[hive] trait HiveInspectors { case c: Class[_] if c == java.lang.Float.TYPE => FloatType case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType -case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) +case c: Class[_] if c.isArray => ArrayType(javaTypeToDataType(c.getComponentType)) // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType -// java list type unsupported +// raw java list type unsupported case c: Class[_] if c == classOf[java.util.List[_]] => throw new AnalysisException( -"List type in java is unsupported because " + -"JVM type erasure makes spark fail to catch a component type in List<>") +"Raw list type in java is unsupported because Spark cannot infer the element type.") -// java map type unsupported +// raw java map type unsupported case c: Class[_] if c == classOf[java.util.Map[_, _]] => throw new AnalysisException( -"Map type in java is unsupported because " + -"JVM type erasure makes spark fail to catch key and value types in Map<>") +"Raw map type in java is unsupported because Spark cannot infer key and value types.") + +case p: ParameterizedType if p.getRawType == classOf[java.util.List[_]] => + val Array(elementType) = p.getActualTypeArguments + ArrayType(javaTypeToDataType(elementType)) + +case p: ParameterizedType if p.getRawType == classOf[java.util.Map[_, _]] => + val Array(keyType, valueType) = p.getActualTypeArguments + MapType(javaTypeToDataType(keyType), javaTypeToDataType(valueType)) + +case _: WildcardType => + throw new AnalysisException( +"Collection types with wildcards (e.g. List or Map) are unsupported because " + + "Spark cannot infer the data type for these type parameters.") --- End diff -- `BoundedType` is a mockito class and not a JVM class. A bound type that cannot be translated to a `DataType` is caught by the final case in the match. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16886: [SPARK-19548][SQL] Support Hive UDFs which return...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16886#discussion_r100545658 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala --- @@ -218,22 +220,33 @@ private[hive] trait HiveInspectors { case c: Class[_] if c == java.lang.Float.TYPE => FloatType case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType -case c: Class[_] if c.isArray => ArrayType(javaClassToDataType(c.getComponentType)) +case c: Class[_] if c.isArray => ArrayType(javaTypeToDataType(c.getComponentType)) // Hive seems to return this for struct types? case c: Class[_] if c == classOf[java.lang.Object] => NullType -// java list type unsupported +// raw java list type unsupported case c: Class[_] if c == classOf[java.util.List[_]] => throw new AnalysisException( -"List type in java is unsupported because " + -"JVM type erasure makes spark fail to catch a component type in List<>") +"Raw list type in java is unsupported because Spark cannot infer the element type.") --- End diff -- It is quite likely that a user/developer will make a mistake for either a list or a map. I think these errors are more informative than the generic error, so I would like to retain them for the sake of user experience. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15928: [SPARK-18478][SQL] Support codegen'd Hive UDFs
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15928#discussion_r100559409 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFsBenchmark.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.hadoop.hive.ql.udf.UDFToDouble +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs + +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveUDFsBenchmark extends BenchmarkBase with TestHiveSingleton { + + ignore("HiveSimpleUDF") { +val N = 2L << 26 +sparkSession.range(N).createOrReplaceTempView("t") +sparkSession.sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[UDFToDouble].getName}'") + +/* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 + Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + + Call Hive UDF: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + -- + Call Hive UDF wholestage off 3 /3 43794.0 0.0 1.0X + Call Hive UDF wholestage on1 /2 101551.3 0.0 2.3X --- End diff -- 3 vs 1 ms? The results are probably not realistic. Can you make the benchmark at least 3000x bigger? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15928: [SPARK-18478][SQL] Support codegen'd Hive UDFs
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/15928#discussion_r100561335 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/execution/benchmark/HiveUDFsBenchmark.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.hadoop.hive.ql.udf.UDFToDouble +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs + +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class HiveUDFsBenchmark extends BenchmarkBase with TestHiveSingleton { + + ignore("HiveSimpleUDF") { +val N = 2L << 26 +sparkSession.range(N).createOrReplaceTempView("t") +sparkSession.sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[UDFToDouble].getName}'") + +/* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 + Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + + Call Hive UDF: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + -- + Call Hive UDF wholestage off 3 /3 43794.0 0.0 1.0X + Call Hive UDF wholestage on1 /2 101551.3 0.0 2.3X + */ +runBenchmark("Call Hive UDF", N) { + sparkSession.sql("SELECT f(id) FROM t") +} +sparkSession.sql("DROP TEMPORARY FUNCTION IF EXISTS f") + } + + ignore("HiveGenericUDF") { +val N = 2L << 26 +sparkSession.range(N).createOrReplaceTempView("t") +sparkSession.sql(s"CREATE TEMPORARY FUNCTION f AS '${classOf[GenericUDFAbs].getName}'") + +/* + Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2 + Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz + + Call Hive generic UDF: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + -- + Call Hive generic UDF wholestage off 2 /2 86919.9 0.0 1.0X --- End diff -- Same comment. 3 vs 1 ms??? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16887: [SPARK-19549] Allow providing reason for stage/jo...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16887#discussion_r100573520 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -2207,10 +2207,32 @@ class SparkContext(config: SparkConf) extends Logging { * Cancel a given job if it's scheduled or running. * * @param jobId the job ID to cancel + * @param reason optional reason for cancellation * @note Throws `InterruptedException` if the cancel message cannot be sent */ - def cancelJob(jobId: Int) { -dagScheduler.cancelJob(jobId) + def cancelJob(jobId: Int, reason: String): Unit = { +dagScheduler.cancelJob(jobId, Some(reason)) --- End diff -- Please use `Option(reason)`, `Some(reason)` will happily wrap a null. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16886: [SPARK-19548][SQL] Support Hive UDFs which return...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16886#discussion_r100595034 --- Diff: sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution; + +import org.apache.hadoop.hive.ql.exec.UDF; + +import java.util.Collections; +import java.util.List; + +/** + * UDF that returns a raw (non-parameterized) java List. + */ +public class UDFRawList extends UDF { --- End diff -- Ok, all files in that dir are indented with 4 spaces. I can modify those if you want me to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16887: [SPARK-19549] Allow providing reason for stage/job cance...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16887 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16894: [SPARK-17897] [SQL] [BACKPORT-2.0] Fixed IsNotNull Const...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16894 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100756976 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -97,6 +98,11 @@ case class SortMergeJoinExec( protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") +val spillThreshold = + sqlContext.conf.getConfString( +"spark.sql.sortMergeJoinExec.buffer.spill.threshold", --- End diff -- Lets also move this configuration in `SQLConf` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100757130 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -376,8 +386,15 @@ case class SortMergeJoinExec( // A list to hold all matched rows from right side. val matches = ctx.freshName("matches") -val clsName = classOf[java.util.ArrayList[InternalRow]].getName -ctx.addMutableState(clsName, matches, s"$matches = new $clsName();") +val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName + +val spillThreshold = --- End diff -- Place it in a def/lazy cal if we need it more than once? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100756129 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala --- @@ -285,6 +283,9 @@ case class WindowExec( val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1) val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray +var spillThreshold = + sqlContext.conf.getConfString("spark.sql.windowExec.buffer.spill.threshold", "4096").toInt --- End diff -- Please make an internal configuration in `SQLConf` for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100756047 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala --- @@ -310,10 +311,15 @@ case class WindowExec( fetchNextRow() // Manage the current partition. -val rows = ArrayBuffer.empty[UnsafeRow] val inputFields = child.output.length -var sorter: UnsafeExternalSorter = null var rowBuffer: RowBuffer = null +if (sqlContext == null) { --- End diff -- ??? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100756809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} + +/** + * An append-only array for [[UnsafeRow]]s that spills content to disk when there is insufficient + * space for it to grow. + * + * Setting spill threshold faces following trade-off: + * + * - If the spill threshold is too high, the in-memory array may occupy more memory than is + * available, resulting in OOM. + * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. + * This may lead to a performance regression compared to the normal case of using an + * [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { + private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow] --- End diff -- Perhaps it is better just to allocate an array, but that might be overkill. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100755730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} + +/** + * An append-only array for [[UnsafeRow]]s that spills content to disk when there is insufficient + * space for it to grow. + * + * Setting spill threshold faces following trade-off: + * + * - If the spill threshold is too high, the in-memory array may occupy more memory than is + * available, resulting in OOM. + * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. + * This may lead to a performance regression compared to the normal case of using an + * [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { + private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow] + + private var spillableArray: UnsafeExternalSorter = null + private var numElements = 0 + + // A counter to keep track of total additions made to this array since its creation. + // This helps to invalidate iterators when there are changes done to the backing array. + private var modCount: Long = 0 + + private var numFieldPerRow = 0 + + def length: Int = numElements + + def isEmpty: Boolean = numElements == 0 + + /** + * Clears up resources (eg. memory) held by the backing storage + */ + def clear(): Unit = { +if (spillableArray != null) { + // The last `spillableArray` of this task will be cleaned up via task completion listener + // inside `UnsafeExternalSorter` + spillableArray.cleanupResources() + spillableArray = null +} else { + inMemoryBuffer.clear() +} +numFieldPerRow = 0 +numElements = 0 +modCount += 1 + } + + def add(entry: InternalRow): Unit = { +val unsafeRow = entry.asInstanceOf[UnsafeRow] + +if (numElements < numRowsSpillThreshold) { + inMemoryBuffer += unsafeRow.copy() +} else { + if (spillableArray == null) { +logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " + + s"${classOf[org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray].getName}") + +// We will not sort the rows, so prefixComparator and recordComparator are null +spillableArray = UnsafeExternalSorter.create( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + null, + null, + if (numRowsSpillThreshold > 2) numRowsSpillThreshold / 2 else 1, + SparkEnv.get.memoryManager.pageSizeBytes, + numRowsSpillThreshold, + false) + +inMemoryBuffer.foreach(existingUnsafeRow => + spillableArray.insertRecord( +existingUnsafeRow.getBaseObject, +existingUnsafeRow.getBaseOffset, +existingUnsafeRow.getSizeInBytes, +0, +false) +) +inMemoryBuffer.clear() +numFieldPerRow = unsafeRow.numFields() +
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100755136 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} + +/** + * An append-only array for [[UnsafeRow]]s that spills content to disk when there is insufficient + * space for it to grow. + * + * Setting spill threshold faces following trade-off: + * + * - If the spill threshold is too high, the in-memory array may occupy more memory than is + * available, resulting in OOM. + * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. + * This may lead to a performance regression compared to the normal case of using an + * [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { + private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow] + + private var spillableArray: UnsafeExternalSorter = null + private var numElements = 0 + + // A counter to keep track of total additions made to this array since its creation. + // This helps to invalidate iterators when there are changes done to the backing array. + private var modCount: Long = 0 + + private var numFieldPerRow = 0 + + def length: Int = numElements + + def isEmpty: Boolean = numElements == 0 + + /** + * Clears up resources (eg. memory) held by the backing storage + */ + def clear(): Unit = { +if (spillableArray != null) { + // The last `spillableArray` of this task will be cleaned up via task completion listener + // inside `UnsafeExternalSorter` + spillableArray.cleanupResources() + spillableArray = null +} else { + inMemoryBuffer.clear() +} +numFieldPerRow = 0 +numElements = 0 +modCount += 1 + } + + def add(entry: InternalRow): Unit = { +val unsafeRow = entry.asInstanceOf[UnsafeRow] --- End diff -- This seems tricky. Lets move this cast to the call site (and preferably avoid it). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100756696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala --- @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} + +/** + * An append-only array for [[UnsafeRow]]s that spills content to disk when there is insufficient + * space for it to grow. + * + * Setting spill threshold faces following trade-off: + * + * - If the spill threshold is too high, the in-memory array may occupy more memory than is + * available, resulting in OOM. + * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes. + * This may lead to a performance regression compared to the normal case of using an + * [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class ExternalAppendOnlyUnsafeRowArray(numRowsSpillThreshold: Int) extends Logging { + private val inMemoryBuffer: ArrayBuffer[UnsafeRow] = ArrayBuffer.empty[UnsafeRow] + + private var spillableArray: UnsafeExternalSorter = null + private var numElements = 0 + + // A counter to keep track of total additions made to this array since its creation. + // This helps to invalidate iterators when there are changes done to the backing array. + private var modCount: Long = 0 --- End diff -- This is only an issue after we have cleared the buffer right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeR...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16909#discussion_r100757937 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/window/RowBuffer.scala --- @@ -17,99 +17,33 @@ package org.apache.spark.sql.execution.window -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator} - +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray /** - * The interface of row buffer for a partition. In absence of a buffer pool (with locking), the + * Represents row buffer for a partition. In absence of a buffer pool (with locking), the * row buffer is used to materialize a partition of rows since we need to repeatedly scan these * rows in window function processing. */ -private[window] abstract class RowBuffer { - - /** Number of rows. */ - def size: Int - - /** Return next row in the buffer, null if no more left. */ - def next(): InternalRow - - /** Skip the next `n` rows. */ - def skip(n: Int): Unit - - /** Return a new RowBuffer that has the same rows. */ - def copy(): RowBuffer -} - -/** - * A row buffer based on ArrayBuffer (the number of rows is limited). - */ -private[window] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer { - - private[this] var cursor: Int = -1 - - /** Number of rows. */ - override def size: Int = buffer.length - - /** Return next row in the buffer, null if no more left. */ - override def next(): InternalRow = { -cursor += 1 -if (cursor < buffer.length) { - buffer(cursor) -} else { - null -} - } - - /** Skip the next `n` rows. */ - override def skip(n: Int): Unit = { -cursor += n - } - - /** Return a new RowBuffer that has the same rows. */ - override def copy(): RowBuffer = { -new ArrayRowBuffer(buffer) - } -} - -/** - * An external buffer of rows based on UnsafeExternalSorter. - */ -private[window] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int) - extends RowBuffer { - - private[this] val iter: UnsafeSorterIterator = sorter.getIterator - - private[this] val currentRow = new UnsafeRow(numFields) +private[window] class RowBuffer(appendOnlyExternalArray: ExternalAppendOnlyUnsafeRowArray) { --- End diff -- Lets just drop row buffer in favor of `ExternalAppendOnlyUnsafeRowArray` it doesn't make a lot of sense to keep this around. We just need a `generateIterator(offset)` for the unbounded following case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16870: [SPARK-19496][SQL]to_date udf to return null when...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16870#discussion_r100771130 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala --- @@ -477,6 +483,27 @@ class DateFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq( Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L))) +val x1 = "2015-07-24 10:00:00" +val x2 = "2015-25-07 02:02:02" +val x3 = "2015-07-24 25:02:02" +val x4 = "2015-24-07 26:02:02" +val ts3 = Timestamp.valueOf("2015-07-24 02:25:02") +val ts4 = Timestamp.valueOf("2015-07-24 00:10:00") + +val df1 = Seq(x1, x2, x3, x4).toDF("x") +checkAnswer(df1.select(unix_timestamp(col("x"))), Seq( + Row(ts1.getTime / 1000L), Row(null), Row(null), Row(null))) --- End diff -- @gatorsmile the ts1 var is defined at the beginning of the test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16870 LGTM - merging to master. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16870 @windpiger can you open a backport to branch-2.1? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16882: [SPARK-19544][SQL] Improve error message when som...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16882#discussion_r100774050 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala --- @@ -321,12 +321,12 @@ trait CheckAnalysis extends PredicateHelper { // Check if the data types match. dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) => // SPARK-18058: we shall not care about the nullability of columns -if (!dt1.sameType(dt2)) { +if (TypeCoercion.findWiderTypeForTwo(dt1.asNullable, dt2.asNullable).isEmpty) { failAnalysis( s""" |${operator.nodeName} can only be performed on tables with the compatible - |column types. $dt1 <> $dt2 at the ${ordinalNumber(ci)} column of - |the ${ordinalNumber(ti + 1)} table + |column types. ${dt1.simpleString} <> ${dt2.simpleString} at the --- End diff -- `StructType.simpleString` gets truncated, so it might be hard/impossible to find the error. Can you use `StructType.catalogString`? For better UX it might be an idea to render the path to the offending variable, instead of printing the entire struct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16882: [SPARK-19544][SQL] Improve error message when some colum...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16882 LGTM - merging to master. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16872: [SPARK-19514] Making range interruptible.
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16872#discussion_r100827652 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala --- @@ -127,4 +133,28 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext { } } } + + test("Cancelling stage in a query with Range.") { +val listener = new SparkListener { + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { +Thread.sleep(100) --- End diff -- Just create another PR under the same ticket. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16921: [SPARK-19589][SQL] Removal of SQLGEN files
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16921 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16921: [SPARK-19589][SQL] Removal of SQLGEN files
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16921 Merging to master. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16909 @zhzhan is this an actual problem? The BufferedRowIterator should not hold a lot of rows in practice. cc @davies --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16870 Yeah, you are right. Lets leave this as it currently is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16925: [SPARK-16475][SQL] Broadcast Hint for SQL Queries
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16925#discussion_r101022875 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala --- @@ -363,6 +363,29 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode { } /** + * A general hint for the child. This node will be eliminated post analysis. + * A pair of (name, parameters). + */ +case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode { + override lazy val resolved: Boolean = false + override def output: Seq[Attribute] = child.output +} + +/** + * Options for writing new data into a table. + * + * @param enabled whether to overwrite existing data in the table. + * @param specificPartition only data in the specified partition will be overwritten. + */ +case class OverwriteOptions( --- End diff -- What is this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16870 Yes it is --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16909: [SPARK-13450] Introduce ExternalAppendOnlyUnsafeRowArray...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16909 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16941: [SPARK-16475][SQL] broadcast hint for SQL queries...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16941#discussion_r101313928 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala --- @@ -505,7 +505,13 @@ class PlanParserSuite extends PlanTest { val m2 = intercept[ParseException] { parsePlan("SELECT /*+ MAPJOIN(default.t) */ * from default.t") }.getMessage -assert(m2.contains("no viable alternative at input")) +assert(m2.contains("mismatched input '.' expecting {')', ','}")) + +// Disallow space as the delimiter. +val m3 = intercept[ParseException] { --- End diff -- NIT: You can use the `PlanParserSuite.intercept` method. That saves some typing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16941: [SPARK-16475][SQL] broadcast hint for SQL queries - disa...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16941 LGTM pending jenkins --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16939: [SPARK-16475][SQL] broadcast hint for SQL queries - foll...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16939 LGTM - merging to master. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16759: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN s...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16759 LGTM - merging to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16798: [SPARK-18873][SQL][TEST] New test cases for scalar subqu...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16798 LGTM - merging to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16802: [SPARK-18872][SQL][TESTS] New test cases for EXISTS subq...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16802 LGTM - merging to master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16915: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN s...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16915 @kevinyu98 @nsyca @dilipbiswal could someone confirm that these results match DB2? I also think that this PR is almost too large. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16943: [SPARK-19607][HOTFIX] Finding QueryExecution that...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16943#discussion_r101381101 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala --- @@ -129,6 +129,8 @@ class SQLExecutionSuite extends SparkFunSuite { df.collect() assert(df.queryExecution === queryExecution) + +spark.stop() --- End diff -- @dongjoon-hyun should we do this in a `try..finally`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16943: [SPARK-19607][HOTFIX] Finding QueryExecution that...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16943#discussion_r101383173 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SQLExecutionSuite.scala --- @@ -129,6 +129,8 @@ class SQLExecutionSuite extends SparkFunSuite { df.collect() assert(df.queryExecution === queryExecution) + +spark.stop() --- End diff -- No, lets leave as it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16958: [SPARK-13721][SQL] Make GeneratorOuter unresolved.
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16958 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16962: [SPARK-18120][SPARK-19557][SQL] Call QueryExecuti...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16962#discussion_r101724210 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala --- @@ -573,6 +575,21 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { format("csv").save(path) } + private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { --- End diff -- Why don't we use `SQLExecution` instead of this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16974: [SPARK-19646][CORE][STREAMING] binaryRecords replicates ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16974 LGTM - pending jenkins --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16974: [SPARK-19646][CORE][STREAMING] binaryRecords replicates ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16974 @srowen should we add a regression test? It seems weird that we didn't catch this in tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16960#discussion_r101782666 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } + test("range metrics") { +val res1 = InputOutputMetricsHelper.run( + spark.range(30).filter(x => x % 3 == 0).toDF() +) +assert(res1 === (30L, 0L, 30L) :: Nil) + +val res2 = InputOutputMetricsHelper.run( + spark.range(150).repartition(4).filter(x => x < 10).toDF() +) +assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil) + +withTempDir { tempDir => + val dir = new File(tempDir, "pqS").getCanonicalPath + + spark.range(10).write.parquet(dir) + spark.read.parquet(dir).createOrReplaceTempView("pqS") + + val res3 = InputOutputMetricsHelper.run( +spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() + ) + assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil) +} + } +} + +object InputOutputMetricsHelper { + private class InputOutputMetricsListener extends SparkListener { +private case class MetricsResult( --- End diff -- Nit: add space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16960#discussion_r101782872 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } + test("range metrics") { +val res1 = InputOutputMetricsHelper.run( + spark.range(30).filter(x => x % 3 == 0).toDF() +) +assert(res1 === (30L, 0L, 30L) :: Nil) + +val res2 = InputOutputMetricsHelper.run( + spark.range(150).repartition(4).filter(x => x < 10).toDF() +) +assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil) + +withTempDir { tempDir => + val dir = new File(tempDir, "pqS").getCanonicalPath + + spark.range(10).write.parquet(dir) + spark.read.parquet(dir).createOrReplaceTempView("pqS") + + val res3 = InputOutputMetricsHelper.run( +spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() + ) + assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil) +} + } +} + +object InputOutputMetricsHelper { + private class InputOutputMetricsListener extends SparkListener { +private case class MetricsResult( +var recordsRead: Long = 0L, +var shuffleRecordsRead: Long = 0L, +var sumMaxOutputRows: Long = 0L) + +private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult] --- End diff -- Make this val. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16960#discussion_r101783584 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } + test("range metrics") { +val res1 = InputOutputMetricsHelper.run( + spark.range(30).filter(x => x % 3 == 0).toDF() +) +assert(res1 === (30L, 0L, 30L) :: Nil) + +val res2 = InputOutputMetricsHelper.run( + spark.range(150).repartition(4).filter(x => x < 10).toDF() +) +assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil) + +withTempDir { tempDir => + val dir = new File(tempDir, "pqS").getCanonicalPath + + spark.range(10).write.parquet(dir) + spark.read.parquet(dir).createOrReplaceTempView("pqS") + + val res3 = InputOutputMetricsHelper.run( +spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() + ) + assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil) +} + } +} + +object InputOutputMetricsHelper { + private class InputOutputMetricsListener extends SparkListener { +private case class MetricsResult( +var recordsRead: Long = 0L, +var shuffleRecordsRead: Long = 0L, +var sumMaxOutputRows: Long = 0L) + +private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult] + +def reset(): Unit = { + stageIdToMetricsResult = HashMap.empty[Int, MetricsResult] +} + +/** + * Return a list of recorded metrics aggregated per stage. + * + * The list is sorted in the ascending order on the stageId. + * For each recorded stage, the following tuple is returned: + * - sum of inputMetrics.recordsRead for all the tasks in the stage + * - sum of shuffleReadMetrics.recordsRead for all the tasks in the stage + * - sum of the highest values of "number of output rows" metric for all the tasks in the stage + */ +def getResults(): List[(Long, Long, Long)] = { + stageIdToMetricsResult.keySet.toList.sorted.map({ stageId => +val res = stageIdToMetricsResult(stageId) +(res.recordsRead, res.shuffleRecordsRead, res.sumMaxOutputRows)}) +} + +override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { + val res = stageIdToMetricsResult.getOrElseUpdate(taskEnd.stageId, { MetricsResult() }) + + res.recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead + res.shuffleRecordsRead += taskEnd.taskMetrics.shuffleReadMetrics.recordsRead + + var maxOutputRows = 0L + for (accum <- taskEnd.taskMetrics.externalAccums) { +val info = accum.toInfo(Some(accum.value), None) +if (info.name.toString.contains("number of output rows")) { + info.update match { +case Some(n: Number) => + if (n.longValue() > maxOutputRows) { +maxOutputRows = n.longValue() + } +case _ => // Ignore. + } +} + } + res.sumMaxOutputRows += maxOutputRows +} + } + + // Run df.collect() and return aggregated metrics for each stage. + def run(df: DataFrame): List[(Long, Long, Long)] = { +val spark = df.sparkSession +val sparkContext = spark.sparkContext +val listener = new InputOutputMetricsListener() --- End diff -- Use try...finally here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16960#discussion_r101783889 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } + test("range metrics") { +val res1 = InputOutputMetricsHelper.run( + spark.range(30).filter(x => x % 3 == 0).toDF() +) +assert(res1 === (30L, 0L, 30L) :: Nil) + +val res2 = InputOutputMetricsHelper.run( + spark.range(150).repartition(4).filter(x => x < 10).toDF() +) +assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil) + +withTempDir { tempDir => + val dir = new File(tempDir, "pqS").getCanonicalPath + + spark.range(10).write.parquet(dir) + spark.read.parquet(dir).createOrReplaceTempView("pqS") + + val res3 = InputOutputMetricsHelper.run( +spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() + ) + assert(res3 === (10L, 0L, 10L) :: (30L, 0L, 30L) :: (0L, 30L, 300L) :: (0L, 300L, 0L) :: Nil) +} + } +} + +object InputOutputMetricsHelper { + private class InputOutputMetricsListener extends SparkListener { +private case class MetricsResult( +var recordsRead: Long = 0L, +var shuffleRecordsRead: Long = 0L, +var sumMaxOutputRows: Long = 0L) + +private[this] var stageIdToMetricsResult = HashMap.empty[Int, MetricsResult] + +def reset(): Unit = { + stageIdToMetricsResult = HashMap.empty[Int, MetricsResult] +} + +/** + * Return a list of recorded metrics aggregated per stage. + * + * The list is sorted in the ascending order on the stageId. + * For each recorded stage, the following tuple is returned: + * - sum of inputMetrics.recordsRead for all the tasks in the stage + * - sum of shuffleReadMetrics.recordsRead for all the tasks in the stage + * - sum of the highest values of "number of output rows" metric for all the tasks in the stage + */ +def getResults(): List[(Long, Long, Long)] = { + stageIdToMetricsResult.keySet.toList.sorted.map({ stageId => +val res = stageIdToMetricsResult(stageId) +(res.recordsRead, res.shuffleRecordsRead, res.sumMaxOutputRows)}) +} + +override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized { + val res = stageIdToMetricsResult.getOrElseUpdate(taskEnd.stageId, { MetricsResult() }) --- End diff -- Nit remove curly braces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16960: [SPARK-19447] Make Range operator generate "recor...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16960#discussion_r101784563 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala --- @@ -309,4 +314,94 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { assert(metricInfoDeser.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } + test("range metrics") { +val res1 = InputOutputMetricsHelper.run( + spark.range(30).filter(x => x % 3 == 0).toDF() +) +assert(res1 === (30L, 0L, 30L) :: Nil) + +val res2 = InputOutputMetricsHelper.run( + spark.range(150).repartition(4).filter(x => x < 10).toDF() +) +assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil) + +withTempDir { tempDir => + val dir = new File(tempDir, "pqS").getCanonicalPath + + spark.range(10).write.parquet(dir) + spark.read.parquet(dir).createOrReplaceTempView("pqS") + + val res3 = InputOutputMetricsHelper.run( +spark.range(0, 30).repartition(3).crossJoin(sql("select * from pqS")).repartition(2).toDF() --- End diff -- This is hard to reason about. Could you add a few lines of documentation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16960: [SPARK-19447] Make Range operator generate "recordsRead"...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16960 LGTM - pending jenkins. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16785 cc @sameeragarwal --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16998: [SPARK-19665][SQL][WIP] Improve constraint propagation
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/16998 @viirya does this PR supersede #16785? I do like the non-parallel approach. I will try to take a more in-depth look at the end of the week (beginning of the next sprint). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16985#discussion_r101973786 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -33,8 +33,8 @@ import org.apache.spark.util.collection.BitSet * Performs a sort merge join of two child relations. */ case class SortMergeJoinExec( -leftKeys: Seq[Expression], -rightKeys: Seq[Expression], +var leftKeys: Seq[Expression], --- End diff -- What information are you missing? The SortMergeExec is replaced after each planning iteration. I would prefer that we use a `lazy val` here instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16610: [SPARK-19254][SQL] Support Seq, Map, and Struct i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16610#discussion_r101974502 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -102,6 +102,27 @@ object functions { Column(literalExpr) } + /** + * Creates a [[Column]] of literal value. + * + * The passed in object is returned directly if it is already a [[Column]]. + * If the object is a Scala Symbol, it is converted into a [[Column]] also. + * Otherwise, a new [[Column]] is created to represent the literal value. + * + * @group normal_funcs + * @since 2.2.0 + */ + def typedLit[T : TypeTag](literal: T): Column = { --- End diff -- cc @cloud-fan WDYT? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16610: [SPARK-19254][SQL] Support Seq, Map, and Struct i...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16610#discussion_r101974913 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -102,6 +102,27 @@ object functions { Column(literalExpr) } + /** + * Creates a [[Column]] of literal value. + * + * The passed in object is returned directly if it is already a [[Column]]. + * If the object is a Scala Symbol, it is converted into a [[Column]] also. + * Otherwise, a new [[Column]] is created to represent the literal value. + * + * @group normal_funcs + * @since 2.2.0 + */ + def typedLit[T : TypeTag](literal: T): Column = { +literal match { --- End diff -- This match statement is slightly hair raising (I know this is copied from `lit(...), how about: ```scala literal match { case c: Column => c case s: Symbol => new ColumnName(s.name) case _ => Column(Literal.create(literal)) } ``` You could also consider mapping the untyped `lit(..)` function to this function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16608: [SPARK-13721][SQL] Support outer generators in Da...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16608#discussion_r102335398 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -163,9 +163,11 @@ object FunctionRegistry { expression[Abs]("abs"), expression[Coalesce]("coalesce"), expression[Explode]("explode"), +expressionGeneratorOuter[Explode]("explode_outer"), --- End diff -- @gatorsmile it uses the expression description of the underlying expression: https://github.com/apache/spark/pull/16608/files#diff-2c0350957ac4932d3f63796eceaeae08R517 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17027: [SPARK-19650] Runnable commands should not trigge...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/17027 [SPARK-19650] Runnable commands should not trigger a Spark job [WIP] ## What changes were proposed in this pull request? Spark executes SQL commands eagerly. It does this by materializing an RDD (which triggers execution of the actual command) with the command's results. The downside to this approach is that this also triggers a Spark job which quite expensive and unnecessary. This PR fixes this by avoiding the materialization of an `RDD` for `RunnableCommands`; it just calls `executedPlan.collectToIterate` to trigger the execution and wraps the `executedPlan` with a `MaterializedPlan` to avoid another execution of the plan. ## How was this patch tested? *TODO* You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark no-job-command Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17027.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17027 commit 4eea40baf569fe989ac6ec0d723259f1ab886ed3 Author: Herman van Hovell Date: 2017-02-17T22:09:02Z Do not trigger a job for runnable commands unless we have to. commit bd379340d16ac1f75b4b94cb739fb2db2a18dbb8 Author: Herman van Hovell Date: 2017-02-22T13:51:46Z Introduce materialized plan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17026: [SPARK-13721][SQL] Make GeneratorOuter unresolved.
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/17026 LGTM - merging to master. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17028: [SPARK-19691][SQL] Fix ClassCastException when ca...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/17028#discussion_r102490587 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala --- @@ -138,7 +138,8 @@ case class Percentile( override def update( buffer: OpenHashMap[Number, Long], input: InternalRow): OpenHashMap[Number, Long] = { -val key = child.eval(input).asInstanceOf[Number] +val scalaValue = CatalystTypeConverters.convertToScala(child.eval(input), child.dataType) --- End diff -- I think it is better to open up the signature of the `OpenHashMap` and use `Ordered` or `AnyRef` as its key type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16608: [SPARK-13721][SQL] Support outer generators in Da...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16608#discussion_r102535917 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -163,9 +163,11 @@ object FunctionRegistry { expression[Abs]("abs"), expression[Coalesce]("coalesce"), expression[Explode]("explode"), +expressionGeneratorOuter[Explode]("explode_outer"), --- End diff -- Why would we need an update? What is the extra information you want to convey? Do you want to add a generic line saying that an outer generator might produce nulls instead of filtering out the row? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16608: [SPARK-13721][SQL] Support outer generators in Da...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/16608#discussion_r102593931 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala --- @@ -163,9 +163,11 @@ object FunctionRegistry { expression[Abs]("abs"), expression[Coalesce]("coalesce"), expression[Explode]("explode"), +expressionGeneratorOuter[Explode]("explode_outer"), --- End diff -- I am not super enthusiastic about this. We have three options here: 1. Leave as it is. 2. Remove the `outer_...` generators, and make a user use the `lateral view outer ...` instead. 3. Create separate OuterGenerator classes for each one, and provide proper documentation. I am fine with any. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17030: [SPARK-19459] Support for nested char/varchar fie...
GitHub user hvanhovell opened a pull request: https://github.com/apache/spark/pull/17030 [SPARK-19459] Support for nested char/varchar fields in ORC ## What changes were proposed in this pull request? This PR is a small follow-up on https://github.com/apache/spark/pull/16804. This PR also adds support for nested char/varchar fields. ## How was this patch tested? I have added a regression test to the OrcSourceSuite. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hvanhovell/spark SPARK-19459-follow-up Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17030.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17030 commit e832ce68e1717cd5b8f2f8e25cf7b5e181abedaf Author: Herman van Hovell Date: 2017-02-22T23:10:28Z Allow for nested char/varchar fields in ORC --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17030: [SPARK-19459] Support for nested char/varchar fields in ...
Github user hvanhovell commented on the issue: https://github.com/apache/spark/pull/17030 cc @cloud-fan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17028: [SPARK-19691][SQL] Fix ClassCastException when ca...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/17028#discussion_r102673129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala --- @@ -130,20 +130,30 @@ case class Percentile( } } - override def createAggregationBuffer(): OpenHashMap[Number, Long] = { + private def toLongValue(d: Any): Long = d match { +case d: Decimal => d.toLong +case n: Number => n.longValue + } + + private def toDoubleValue(d: Any): Double = d match { +case d: Decimal => d.toDouble +case n: Number => n.doubleValue + } + + override def createAggregationBuffer(): OpenHashMap[AnyRef, Long] = { // Initialize new counts map instance here. -new OpenHashMap[Number, Long]() +new OpenHashMap[AnyRef, Long]() } override def update( - buffer: OpenHashMap[Number, Long], - input: InternalRow): OpenHashMap[Number, Long] = { -val key = child.eval(input).asInstanceOf[Number] + buffer: OpenHashMap[AnyRef, Long], + input: InternalRow): OpenHashMap[AnyRef, Long] = { +val key = child.eval(input).asInstanceOf[AnyRef] val frqValue = frequencyExpression.eval(input) // Null values are ignored in counts map. if (key != null && frqValue != null) { - val frqLong = frqValue.asInstanceOf[Number].longValue() + val frqLong = toLongValue(frqValue) --- End diff -- `frqValue` is guaranteed to return a integral value. So this is not needed. We could also force it to be a Long, that would make this even simpler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17028: [SPARK-19691][SQL] Fix ClassCastException when ca...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/17028#discussion_r102673567 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala --- @@ -274,7 +283,8 @@ case class Percentile( val row = new UnsafeRow(2) row.pointTo(bs, sizeOfNextRow) // Insert the pairs into counts map. -val key = row.get(0, child.dataType).asInstanceOf[Number] +val catalystValue = row.get(0, child.dataType) --- End diff -- NIT: Just change the cast in the old code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17028: [SPARK-19691][SQL] Fix ClassCastException when ca...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/17028#discussion_r102673952 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala --- @@ -39,44 +38,44 @@ class PercentileSuite extends SparkFunSuite { val agg = new Percentile(BoundReference(0, IntegerType, true), Literal(0.5)) // Check empty serialize and deserialize -val buffer = new OpenHashMap[Number, Long]() +val buffer = new OpenHashMap[AnyRef, Long]() assert(compareEquals(agg.deserialize(agg.serialize(buffer)), buffer)) // Check non-empty buffer serializa and deserialize. data.foreach { key => - buffer.changeValue(key, 1L, _ + 1L) + buffer.changeValue(new Integer(key), 1L, _ + 1L) } assert(compareEquals(agg.deserialize(agg.serialize(buffer)), buffer)) } test("class Percentile, high level interface, update, merge, eval...") { val count = 1 val percentages = Seq(0, 0.25, 0.5, 0.75, 1) -val expectedPercentiles = Seq(1, 2500.75, 5000.5, 7500.25, 1) +val expectedPercentiles = Seq[Double](1, 2500.75, 5000.5, 7500.25, 1) --- End diff -- Do we need to type this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17028: [SPARK-19691][SQL] Fix ClassCastException when ca...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/17028#discussion_r102673905 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala --- @@ -39,44 +38,44 @@ class PercentileSuite extends SparkFunSuite { val agg = new Percentile(BoundReference(0, IntegerType, true), Literal(0.5)) // Check empty serialize and deserialize -val buffer = new OpenHashMap[Number, Long]() +val buffer = new OpenHashMap[AnyRef, Long]() assert(compareEquals(agg.deserialize(agg.serialize(buffer)), buffer)) // Check non-empty buffer serializa and deserialize. data.foreach { key => - buffer.changeValue(key, 1L, _ + 1L) + buffer.changeValue(new Integer(key), 1L, _ + 1L) --- End diff -- To we need to explicitly type this? I thoughtscala boxed automatically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org